Changeset - b2634df81a11
[Not reviewed]
default
0 4 0
Mads Kiilerich - 7 years ago 2018-12-29 17:48:07
mads@kiilerich.com
auth: explicit user permission should not blindly overrule permissions through user groups

Before, explicit permissions of a user could shadow higher permissions that
would otherwise be obtained through a group the user is member of.
That was confusing and fragile: *removing* a permission could then suddenly
give a user *more* permissions.

Instead, change the flag for controlling internal permission computation to
*not* use "explicit". Permissions will then add up, no matter if they are
explicit or through groups.

The change in auth.py is small, but read the body of __get_perms to see the
actual impact ... and also the clean-up changeset that will come next.

This might in some cases be a behaviour change and give users more access ...
but it will probably only give the user that was intended. This change can thus
be seen as a bugfix.

Some tests assumed the old behaviour. Not for good reasons, but just because
that is how they were written. These tests are updated to expect the new
behaviour, and it has been reviewed that it makes sense.

Note that this 'explicit' flag mostly is for repo permissions and independent
of the 'user_inherit_default_permissions' that just was removed and is about
global permissions.
4 files changed with 44 insertions and 34 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -178,769 +178,769 @@ def _cached_perms_data(user_id, user_is_
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == default_user_id) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        if perm.Repository.private and not (perm.Repository.owner_id == user_id):
 
            # disable defaults for private repos,
 
            p = 'repository.none'
 
        elif perm.Repository.owner_id == user_id:
 
            # set admin if owner
 
            p = 'repository.admin'
 
        else:
 
            p = perm.Permission.permission_name
 

	
 
        permissions[RK][r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        permissions[GK][rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        permissions[UK][u_k] = p
 

	
 
    #======================================================================
 
    # !! Augment GLOBALS with user permissions if any found !!
 
    #======================================================================
 

	
 
    # USER GROUPS comes first
 
    # user group global permissions
 
    user_perms_from_users_groups = Session().query(UserGroupToPerm) \
 
        .options(joinedload(UserGroupToPerm.permission)) \
 
        .join((UserGroupMember, UserGroupToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .join((UserGroup, UserGroupMember.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .order_by(UserGroupToPerm.users_group_id) \
 
        .all()
 
    # need to group here by groups since user can be in more than
 
    # one group
 
    _grouped = [[x, list(y)] for x, y in
 
                itertools.groupby(user_perms_from_users_groups,
 
                                  lambda x:x.users_group)]
 
    for gr, perms in _grouped:
 
        for perm in perms:
 
            permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # user specific global permissions
 
    user_perms = Session().query(UserToPerm) \
 
            .options(joinedload(UserToPerm.permission)) \
 
            .filter(UserToPerm.user_id == user_id).all()
 

	
 
    for perm in user_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # for each kind of global permissions, only keep the one with heighest weight
 
    kind_max_perm = {}
 
    for perm in sorted(permissions[GLOBAL], key=lambda n: PERM_WEIGHTS[n]):
 
        kind = perm.rsplit('.', 1)[0]
 
        kind_max_perm[kind] = perm
 
    permissions[GLOBAL] = set(kind_max_perm.values())
 
    ## END GLOBAL PERMISSIONS
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORIES !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository and
 
    # fill in his permission from it.
 
    #======================================================================
 

	
 
    # user group for repositories permissions
 
    user_repo_perms_from_users_groups = \
 
     Session().query(UserGroupRepoToPerm, Permission, Repository,) \
 
        .join((Repository, UserGroupRepoToPerm.repository_id ==
 
               Repository.repo_id)) \
 
        .join((Permission, UserGroupRepoToPerm.permission_id ==
 
               Permission.permission_id)) \
 
        .join((UserGroup, UserGroupRepoToPerm.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .join((UserGroupMember, UserGroupRepoToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_perms_from_users_groups:
 
        r_k = perm.UserGroupRepoToPerm.repository.repo_name
 
        multiple_counter[r_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[RK][r_k]
 

	
 
        if perm.Repository.owner_id == user_id:
 
            # set admin if owner
 
            p = 'repository.admin'
 
        else:
 
            if multiple_counter[r_k] > 1:
 
                p = _choose_perm(p, cur_perm)
 
        permissions[RK][r_k] = p
 

	
 
    # user explicit permissions for repositories, overrides any specified
 
    # by the group permission
 
    user_repo_perms = Permission.get_default_perms(user_id)
 
    for perm in user_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        cur_perm = permissions[RK][r_k]
 
        # set admin if owner
 
        if perm.Repository.owner_id == user_id:
 
            p = 'repository.admin'
 
        else:
 
            p = perm.Permission.permission_name
 
            if not explicit:
 
                p = _choose_perm(p, cur_perm)
 
        permissions[RK][r_k] = p
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORY GROUPS !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository groups and
 
    # fill in his permission from it.
 
    #======================================================================
 
    # user group for repo groups permissions
 
    user_repo_group_perms_from_users_groups = \
 
     Session().query(UserGroupRepoGroupToPerm, Permission, RepoGroup) \
 
     .join((RepoGroup, UserGroupRepoGroupToPerm.group_id == RepoGroup.group_id)) \
 
     .join((Permission, UserGroupRepoGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroup, UserGroupRepoGroupToPerm.users_group_id ==
 
            UserGroup.users_group_id)) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .join((UserGroupMember, UserGroupRepoGroupToPerm.users_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_group_perms_from_users_groups:
 
        g_k = perm.UserGroupRepoGroupToPerm.group.group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[GK][g_k]
 
        if multiple_counter[g_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        permissions[GK][g_k] = p
 

	
 
    # user explicit permissions for repository groups
 
    user_repo_groups_perms = Permission.get_default_group_perms(user_id)
 
    for perm in user_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[GK][rg_k]
 
        if not explicit:
 
            p = _choose_perm(p, cur_perm)
 
        permissions[GK][rg_k] = p
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR USER GROUPS !!
 
    #======================================================================
 
    # user group for user group permissions
 
    user_group_user_groups_perms = \
 
     Session().query(UserGroupUserGroupToPerm, Permission, UserGroup) \
 
     .join((UserGroup, UserGroupUserGroupToPerm.target_user_group_id
 
            == UserGroup.users_group_id)) \
 
     .join((Permission, UserGroupUserGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .join((UserGroup, UserGroupMember.users_group_id ==
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_group_user_groups_perms:
 
        g_k = perm.UserGroupUserGroupToPerm.target_user_group.users_group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[UK][g_k]
 
        if multiple_counter[g_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        permissions[UK][g_k] = p
 

	
 
    # user explicit permission for user groups
 
    user_user_groups_perms = Permission.get_default_user_group_perms(user_id)
 
    for perm in user_user_groups_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[UK][u_k]
 
        if not explicit:
 
            p = _choose_perm(p, cur_perm)
 
        permissions[UK][u_k] = p
 

	
 
    return permissions
 

	
 

	
 
def allowed_api_access(controller_name, whitelist=None, api_key=None):
 
    """
 
    Check if given controller_name is in whitelist API access
 
    """
 
    if not whitelist:
 
        from kallithea import CONFIG
 
        whitelist = aslist(CONFIG.get('api_access_controllers_whitelist'),
 
                           sep=',')
 
        log.debug('whitelist of API access is: %s', whitelist)
 
    api_access_valid = controller_name in whitelist
 
    if api_access_valid:
 
        log.debug('controller:%s is in API whitelist', controller_name)
 
    else:
 
        msg = 'controller: %s is *NOT* in API whitelist' % (controller_name)
 
        if api_key:
 
            # if we use API key and don't have access it's a warning
 
            log.warning(msg)
 
        else:
 
            log.debug(msg)
 
    return api_access_valid
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False. It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    def __init__(self, user_id=None, dbuser=None, authenticating_api_key=None,
 
            is_external_auth=False):
 

	
 
        self.is_authenticated = False
 
        self.is_external_auth = is_external_auth
 
        self.authenticating_api_key = authenticating_api_key
 

	
 
        # These attributes will be overridden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = UserModel().get(user_id)
 
        else:
 
            # Note: dbuser is allowed to be None.
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        is_user_loaded = self._fill_data(dbuser)
 

	
 
        # If user cannot be found, try falling back to anonymous.
 
        if is_user_loaded:
 
            assert dbuser is not None
 
            self.is_default_user = dbuser.is_default_user
 
        else:
 
            default_user = User.get_default_user(cache=True)
 
            is_user_loaded = self._fill_data(default_user)
 
            self.is_default_user = is_user_loaded
 

	
 
        self.is_anonymous = not is_user_loaded or self.is_default_user
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s', self)
 

	
 
    def _fill_data(self, dbuser):
 
        """
 
        Copies database fields from a `db.User` to this `AuthUser`. Does
 
        not copy `api_keys` and `permissions` attributes.
 

	
 
        Checks that `dbuser` is `active` (and not None) before copying;
 
        returns True on success.
 
        """
 
        if dbuser is not None and dbuser.active:
 
            log.debug('filling %s data', dbuser)
 
            for k, v in dbuser.get_dict().iteritems():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        return self.__get_perms(user=self, cache=False)
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['repository.read', 'repository.write', 'repository.admin'],
 
            'write': ['repository.write', 'repository.admin'],
 
            'admin': ['repository.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories'].get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_user_group_permission_level(self, user_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
 
            'write': ['usergroup.write', 'usergroup.admin'],
 
            'admin': ['usergroup.admin'],
 
        }[level]
 
        actual_perm = self.permissions['user_groups'].get(user_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
 
            self.username, level, user_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 

	
 
    def __get_perms(self, user, explicit=True, cache=False):
 
    def __get_perms(self, user, explicit=False, cache=False):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: `AuthUser` instance
 
        :param explicit: In case there are permissions both for user and a group
 
            that user is part of, explicit flag will define if user will
 
            explicitly override permissions from group, if it's False it will
 
            compute the decision
 
        """
 
        user_id = user.user_id
 
        user_is_admin = user.is_admin
 

	
 
        log.debug('Getting PERMISSION tree')
 
        compute = conditional_cache('short_term', 'cache_desc',
 
                                    condition=cache, func=_cached_perms_data)
 
        return compute(user_id, user_is_admin, explicit)
 

	
 
    def _get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query() \
 
                .filter_by(user_id=self.user_id, is_expired=False):
 
            api_keys.append(api_key.api_key)
 

	
 
        return api_keys
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].iteritems()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].iteritems()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['user_groups'].iteritems()
 
                if x[1] == 'usergroup.admin']
 

	
 
    @staticmethod
 
    def check_ip_allowed(user, ip_addr):
 
        """
 
        Check if the given IP address (a `str`) is allowed for the given
 
        user (an `AuthUser` or `db.User`).
 
        """
 
        allowed_ips = AuthUser.get_allowed_ips(user.user_id, cache=True)
 
        if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
 
            log.debug('IP:%s is in range of %s', ip_addr, allowed_ips)
 
            return True
 
        else:
 
            log.info('Access for IP:%s forbidden, '
 
                     'not in %s' % (ip_addr, allowed_ips))
 
            return False
 

	
 
    def __repr__(self):
 
        return "<AuthUser('id:%s[%s] auth:%s')>" \
 
            % (self.user_id, self.username, (self.is_authenticated or self.is_default_user))
 

	
 
    def to_cookie(self):
 
        """ Serializes this login session to a cookie `dict`. """
 
        return {
 
            'user_id': self.user_id,
 
            'is_external_auth': self.is_external_auth,
 
        }
 

	
 
    @staticmethod
 
    def from_cookie(cookie):
 
        """
 
        Deserializes an `AuthUser` from a cookie `dict`.
 
        """
 

	
 
        au = AuthUser(
 
            user_id=cookie.get('user_id'),
 
            is_external_auth=cookie.get('is_external_auth', False),
 
        )
 
        au.is_authenticated = True
 
        return au
 

	
 
    @classmethod
 
    def get_allowed_ips(cls, user_id, cache=False):
 
        _set = set()
 

	
 
        default_ips = UserIpMap.query().filter(UserIpMap.user_id ==
 
                                        User.get_default_user(cache=True).user_id)
 
        if cache:
 
            default_ips = default_ips.options(FromCache("sql_cache_short",
 
                                              "get_user_ips_default"))
 
        for ip in default_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 

	
 
        user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
 
        if cache:
 
            user_ips = user_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_%s" % user_id))
 
        for ip in user_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 
        return _set or set(['0.0.0.0/0', '::/0'])
 

	
 

	
 
def set_available_permissions(config):
 
    """
 
    This function will propagate globals with all available defined
 
    permission given in db. We don't want to check each time from db for new
 
    permissions since adding a new permission also requires application restart
 
    ie. to decorate new views with the newly created permission
 

	
 
    :param config: current config instance
 

	
 
    """
 
    log.info('getting information about all available permissions')
 
    try:
 
        all_perms = Session().query(Permission).all()
 
        config['available_permissions'] = [x.permission_name for x in all_perms]
 
    finally:
 
        Session.remove()
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def _redirect_to_login(message=None):
 
    """Return an exception that must be raised. It will redirect to the login
 
    page which will redirect back to the current URL after authentication.
 
    The optional message will be shown in a flash message."""
 
    from kallithea.lib import helpers as h
 
    if message:
 
        h.flash(message, category='warning')
 
    p = request.path_qs
 
    log.debug('Redirecting to login page, origin: %s', p)
 
    return HTTPFound(location=url('login_home', came_from=p))
 

	
 

	
 
# Use as decorator
 
class LoginRequired(object):
 
    """Client must be logged in as a valid User, or we'll redirect to the login
 
    page.
 

	
 
    If the "default" user is enabled and allow_default_user is true, that is
 
    considered valid too.
 

	
 
    Also checks that IP address is allowed, and if using API key instead
 
    of regular cookie authentication, checks that API key access is allowed
 
    (based on `api_access` parameter and the API view whitelist).
 
    """
 

	
 
    def __init__(self, api_access=False, allow_default_user=False):
 
        self.api_access = api_access
 
        self.allow_default_user = allow_default_user
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        controller = fargs[0]
 
        user = request.authuser
 
        loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
 
        log.debug('Checking access for user %s @ %s', user, loc)
 

	
 
        # Check if we used an API key to authenticate.
 
        api_key = user.authenticating_api_key
 
        if api_key is not None:
 
            # Check that controller is enabled for API key usage.
 
            if not self.api_access and not allowed_api_access(loc, api_key=api_key):
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed', loc)
 
                raise HTTPForbidden()
 

	
 
            log.info('user %s authenticated with API key ****%s @ %s',
 
                     user, api_key[-4:], loc)
 
            return func(*fargs, **fkwargs)
 

	
 
        # CSRF protection: Whenever a request has ambient authority (whether
 
        # through a session cookie or its origin IP address), it must include
 
        # the correct token, unless the HTTP method is GET or HEAD (and thus
 
        # guaranteed to be side effect free. In practice, the only situation
 
        # where we allow side effects without ambient authority is when the
 
        # authority comes from an API key; and that is handled above.
 
        if request.method not in ['GET', 'HEAD']:
 
            token = request.POST.get(secure_form.token_key)
 
            if not token or token != secure_form.authentication_token():
 
                log.error('CSRF check failed')
 
                raise HTTPForbidden()
 

	
 
        # regular user authentication
 
        if user.is_authenticated:
 
            log.info('user %s authenticated with regular auth @ %s', user, loc)
 
            return func(*fargs, **fkwargs)
 
        elif user.is_default_user:
 
            if self.allow_default_user:
 
                log.info('default user @ %s', loc)
 
                return func(*fargs, **fkwargs)
 
            log.info('default user is not accepted here @ %s', loc)
 
        else:
 
            log.warning('user %s NOT authenticated with regular auth @ %s', user, loc)
 
        raise _redirect_to_login()
 

	
 

	
 
# Use as decorator
 
class NotAnonymous(object):
 
    """Ensures that client is not logged in as the "default" user, and
 
    redirects to the login page otherwise. Must be used together with
 
    LoginRequired."""
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = request.authuser
 

	
 
        log.debug('Checking that user %s is not anonymous @%s', user.username, cls)
 

	
 
        if user.is_default_user:
 
            raise _redirect_to_login(_('You need to be a registered user to '
 
                                       'perform this action'))
 
        else:
 
            return func(*fargs, **fkwargs)
 

	
 

	
 
class _PermsDecorator(object):
 
    """Base class for controller decorators with multiple permissions"""
 

	
 
    def __init__(self, *required_perms):
 
        self.required_perms = required_perms # usually very short - a list is thus fine
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = request.authuser
 
        log.debug('checking %s permissions %s for %s %s',
 
          self.__class__.__name__, self.required_perms, cls, user)
 

	
 
        if self.check_permissions(user):
 
            log.debug('Permission granted for %s %s', cls, user)
 
            return func(*fargs, **fkwargs)
 

	
 
        else:
 
            log.info('Permission denied for %s %s', cls, user)
 
            if user.is_default_user:
 
                raise _redirect_to_login(_('You need to be signed in to view this page'))
 
            else:
 
                raise HTTPForbidden()
 

	
 
    def check_permissions(self, user):
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAnyDecorator(_PermsDecorator):
 
    """
 
    Checks the user has any of the given global permissions.
 
    """
 

	
 
    def check_permissions(self, user):
 
        global_permissions = user.permissions['global'] # usually very short
 
        return any(p in global_permissions for p in self.required_perms)
 

	
 

	
 
class _PermDecorator(_PermsDecorator):
 
    """Base class for controller decorators with a single permission"""
 

	
 
    def __init__(self, required_perm):
 
        _PermsDecorator.__init__(self, [required_perm])
 
        self.required_perm = required_perm
 

	
 

	
 
class HasRepoPermissionLevelDecorator(_PermDecorator):
 
    """
 
    Checks the user has at least the specified permission level for the requested repository.
 
    """
 

	
 
    def check_permissions(self, user):
 
        repo_name = get_repo_slug(request)
 
        return user.has_repository_permission_level(repo_name, self.required_perm)
 

	
 

	
 
class HasRepoGroupPermissionLevelDecorator(_PermDecorator):
 
    """
 
    Checks the user has any of given permissions for the requested repository group.
 
    """
 

	
 
    def check_permissions(self, user):
 
        repo_group_name = get_repo_group_slug(request)
 
        return user.has_repository_group_permission_level(repo_group_name, self.required_perm)
 

	
 

	
 
class HasUserGroupPermissionLevelDecorator(_PermDecorator):
 
    """
 
    Checks for access permission for any of given predicates for specific
 
    user group. In order to fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self, user):
 
        user_group_name = get_user_group_slug(request)
 
        return user.has_user_group_permission_level(user_group_name, self.required_perm)
 

	
 

	
 
#==============================================================================
 
# CHECK FUNCTIONS
 
#==============================================================================
 

	
 
class _PermsFunction(object):
 
    """Base function for other check functions with multiple permissions"""
 

	
 
    def __init__(self, *required_perms):
 
        self.required_perms = required_perms # usually very short - a list is thus fine
 

	
 
    def __nonzero__(self):
 
        """ Defend against accidentally forgetting to call the object
 
            and instead evaluating it directly in a boolean context,
 
            which could have security implications.
 
        """
 
        raise AssertionError(self.__class__.__name__ + ' is not a bool and must be called!')
 

	
 
    def __call__(self, *a, **b):
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAny(_PermsFunction):
 

	
 
    def __call__(self, purpose=None):
 
        global_permissions = request.authuser.permissions['global'] # usually very short
 
        ok = any(p in global_permissions for p in self.required_perms)
 

	
 
        log.debug('Check %s for global %s (%s): %s',
 
            request.authuser.username, self.required_perms, purpose, ok)
 
        return ok
 

	
 

	
 
class _PermFunction(_PermsFunction):
 
    """Base function for other check functions with a single permission"""
 

	
 
    def __init__(self, required_perm):
 
        _PermsFunction.__init__(self, [required_perm])
 
        self.required_perm = required_perm
 

	
 

	
 
class HasRepoPermissionLevel(_PermFunction):
 

	
 
    def __call__(self, repo_name, purpose=None):
 
        return request.authuser.has_repository_permission_level(repo_name, self.required_perm, purpose)
 

	
 

	
 
class HasRepoGroupPermissionLevel(_PermFunction):
 

	
 
    def __call__(self, group_name, purpose=None):
 
        return request.authuser.has_repository_group_permission_level(group_name, self.required_perm, purpose)
 

	
 

	
 
class HasUserGroupPermissionLevel(_PermFunction):
 

	
 
    def __call__(self, user_group_name, purpose=None):
 
        return request.authuser.has_user_group_permission_level(user_group_name, self.required_perm, purpose)
 

	
 

	
 
#==============================================================================
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Tests for the JSON-RPC web api.
 
"""
 

	
 
import os
 
import random
 
import mock
 
import re
 

	
 
import pytest
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.db import Repository, User, Setting, Ui, PullRequest, ChangesetStatus, RepoGroup
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = u'test_user_group'
 
TEST_REPO_GROUP = u'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: json.loads(json.dumps(obj))
 

	
 

	
 
def crash(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname=u'first',
 
            lastname=u'last'
 
        )
 
        Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 
        cls.default_user_username = User.get_default_user().username
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setup_method(self, method):
 
        make_user_group()
 
        make_repo_group()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_user_group(TEST_USER_GROUP)
 
        fixture.destroy_gists()
 
        fixture.destroy_repo_group(TEST_REPO_GROUP)
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def test_Optional_object(self):
 
        from kallithea.controllers.api.api import Optional
 

	
 
        option1 = Optional(None)
 
        assert '<Optional:%s>' % None == repr(option1)
 
        assert option1() is None
 

	
 
        assert 1 == Optional.extract(Optional(1))
 
        assert 'trololo' == Optional.extract('trololo')
 

	
 
    def test_Optional_OAttr(self):
 
        from kallithea.controllers.api.api import Optional, OAttr
 

	
 
        option1 = Optional(OAttr('apiuser'))
 
        assert 'apiuser' == Optional.extract(option1)
 

	
 
    def test_OAttr_object(self):
 
        from kallithea.controllers.api.api import OAttr
 

	
 
        oattr1 = OAttr('apiuser')
 
        assert '<OptionalAttr:apiuser>' == repr(oattr1)
 
        assert oattr1() == oattr1
 

	
 
    def test_api_wrong_key(self):
 
        id_, params = _build_data('trololo', 'get_user')
 
        response = api_call(self, params)
 

	
 
        expected = 'Invalid API key'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_null(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_different_args(self):
 
        import string
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
 
        id_, params = _build_data(self.apikey, 'test', args=expected)
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 
        self._compare_ok(id_, expected, response.body)
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        response = api_call(self, params)
 
        ret_all = []
 
        _users = User.query().filter_by(is_default_user=False) \
 
            .order_by(User.username).all()
 
        for usr in _users:
 
            ret = usr.get_api_data()
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` does not exist" % 'trololo'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull_remote(self):
 
        # Note: pulling from local repos is a mis-feature - it will bypass access control
 
        # ... but ok, if the path already has been set in the database
 
        repo_name = u'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        # hack around that clone_uri can't be set to to a local path
 
        # (as shown by test_api_create_repo_clone_uri_local)
 
        r.clone_uri = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 
        Session().commit()
 

	
 
        pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == repo_name)]
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == repo_name)]
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
        assert pre_cached_tip != post_cached_tip
 

	
 
    def test_api_pull_fork(self):
 
        fork_name = u'fork'
 
        fixture.create_fork(self.REPO, fork_name)
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=fork_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % fork_name,
 
                    'repository': fork_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_pull_error_no_remote_no_fork(self):
 
        # should fail because no clone_uri is set
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=self.REPO, )
 
        response = api_call(self, params)
 

	
 
        expected = 'Unable to pull changes from `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull_custom_remote(self):
 
        repo_name = u'test_pull_custom_remote'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        custom_remote_path = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,
 
                                  clone_uri=custom_remote_path)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', crash)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos', )
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached()  # seed cache
 

	
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': "Cache for repository `%s` was invalidated" % (self.REPO,),
 
            'repository': self.REPO
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
 
    def test_api_invalidate_cache_error(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during cache invalidation action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache_regular_user_no_permission(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached() # seed cache
 

	
 
        id_, params = _build_data(self.apikey_regular, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = "repository `%s` does not exist" % (self.REPO,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN,
 
                                  email='test@example.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 

	
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    def test_api_create_user_without_password(self):
 
        username = 'test_new_api_user_passwordless'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email)
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    def test_api_create_user_with_extern_name(self):
 
        username = 'test_new_api_user_passwordless'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email, extern_name='internal')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    @mock.patch.object(UserModel, 'create_or_update', crash)
 
    def test_api_create_user_when_exception_happened(self):
 

	
 
        username = 'test_new_api_user'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 
        expected = 'failed to create user `%s`' % username
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_user(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@example.com',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 
        email = usr.email
 
        usr_id = usr.user_id
 
        ## DELETE THIS USER NOW
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username, )
 
        response = api_call(self, params)
 

	
 
        ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
 
               'user': None}
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'delete', crash)
 
    def test_api_delete_user_when_exception_happened(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@example.com',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username, )
 
        response = api_call(self, params)
 
        ret = 'failed to delete user ID:%s %s' % (usr.user_id,
 
                                                  usr.username)
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,expected', [
 
        ('firstname', 'new_username'),
 
        ('lastname', 'new_username'),
 
        ('email', 'new_username'),
 
        ('admin', True),
 
        ('admin', False),
 
        ('extern_type', 'ldap'),
 
        ('extern_type', None),
 
        ('extern_name', 'test'),
 
        ('extern_name', None),
 
        ('active', False),
 
        ('active', True),
 
        ('password', 'newpass'),
 
    ])
 
    def test_api_update_user(self, name, expected):
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        kw = {name: expected,
 
              'userid': usr.user_id}
 
        id_, params = _build_data(self.apikey, 'update_user', **kw)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, self.TEST_USER_LOGIN),
 
            'user': jsonify(User \
 
                .get_by_username(self.TEST_USER_LOGIN) \
 
                .get_api_data())
 
        }
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_no_changed_params(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_by_user_id(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_default_user(self):
 
        usr = User.get_default_user()
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        expected = 'editing default user is forbidden'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'update_user', crash)
 
    def test_api_update_user_when_exception_happens(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = 'failed to update user `%s`' % usr.user_id
 

	
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo(self):
 
        new_group = u'some_new_group'
 
        make_user_group(new_group)
 
        RepoModel().grant_user_group_permission(repo=self.REPO,
 
                                                group_name=new_group,
 
                                                perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 
        assert u"tags" not in response.json[u'result']
 
        assert u'pull_requests' not in response.json[u'result']
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {'name': user.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {'name': user_group.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_user_group(new_group)
 

	
 
        id_, params = _build_data(self.apikey, 'get_repo', repoid=self.REPO,
 
                                  with_revision_names=True,
 
                                  with_pullrequests=True)
 
        response = api_call(self, params)
 
        assert u"v0.2.0" in response.json[u'result'][u'tags']
 
        assert u'pull_requests' in response.json[u'result']
 

	
 
    @parametrize('grant_perm', [
 
        ('repository.admin'),
 
        ('repository.write'),
 
        ('repository.read'),
 
    ])
 
    def test_api_get_repo_by_non_admin(self, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        assert 2 == len(repo.repo_to_perm)
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user_obj = user.user
 
            user_data = {'name': user_obj.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group_obj = user_group.users_group
 
            user_group_data = {'name': user_group_obj.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          user=self.default_user_username,
 
                                          perm='repository.none')
 
        try:
 
            RepoModel().grant_user_permission(repo=self.REPO,
 
                                              user=self.TEST_USER_LOGIN,
 
                                              perm='repository.none')
 

	
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 
            id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                      repoid=self.REPO)
 
            response = api_call(self, params)
 

	
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
            expected = 'repository `%s` does not exist' % (self.REPO)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().grant_user_permission(repo=self.REPO,
 
                                              user=self.default_user_username,
 
                                              perm='repository.read')
 

	
 
    def test_api_get_repo_that_doesn_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid='no-such-repo')
 
        response = api_call(self, params)
 

	
 
        ret = 'repository `%s` does not exist' % 'no-such-repo'
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos(self):
 
        id_, params = _build_data(self.apikey, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        expected = jsonify([
 
            repo.get_api_data()
 
            for repo in Repository.query()
 
        ])
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        expected = jsonify([
 
            repo.get_api_data()
 
            for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN)
 
        ])
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parametrize('name,ret_type', [
 
        ('all', 'all'),
 
        ('dirs', 'dirs'),
 
        ('files', 'files'),
 
    ])
 
    def test_api_get_repo_nodes(self, name, ret_type):
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_revisions(self):
 
        rev = 'i-dont-exist'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_path(self):
 
        rev = 'tip'
 
        path = '/idontexits'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(['files', 'dirs', 'all'])))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,ret_type,grant_perm', [
 
        ('all', 'all', 'repository.write'),
 
        ('dirs', 'dirs', 'repository.admin'),
 
        ('files', 'files', 'repository.read'),
 
    ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 

	
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    @parametrize('repo_name', [
 
        u'',
 
        u'.',
 
        u'..',
 
        u':',
 
        u'/',
 
        u'<test>',
 
    ])
 
    def test_api_create_repo_bad_names(self, repo_name):
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        if repo_name == '/':
 
            expected = "repo group `` not found"
 
            self._compare_error(id_, expected, given=response.body)
 
        else:
 
            expected = "failed to create repository `%s`" % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_clone_uri_local(self):
 
        # cloning from local repos was a mis-feature - it would bypass access control
 
        # TODO: introduce other test coverage of actual remote cloning
 
        clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
                                  clone_uri=clone_uri,
 
        )
 
        response = api_call(self, params)
 
        expected = "failed to create repository `%s`" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_and_repo_group(self):
 
        repo_group_name = u'my_gr'
 
        repo_name = u'%s/api-repo' % repo_group_name
 

	
 
        # repo creation can no longer also create repo group
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = u'repo group `%s` not found' % repo_group_name
 
        self._compare_error(id_, expected, given=response.body)
 
        assert RepoModel().get_by_repo_name(repo_name) is None
 

	
 
        # create group before creating repo
 
        rg = fixture.create_repo_group(repo_group_name)
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 

	
 
        fixture.destroy_repo(repo_name)
 
        fixture.destroy_repo_group(repo_group_name)
 

	
 
    def test_api_create_repo_in_repo_group_without_permission(self):
 
        repo_group_basename = u'api-repo-repo'
 
        repo_group_name = u'%s/%s' % (TEST_REPO_GROUP, repo_group_basename)
 
        repo_name = u'%s/api-repo' % repo_group_name
 

	
 
        top_group = RepoGroup.get_by_group_name(TEST_REPO_GROUP)
 
        assert top_group
 
        rg = fixture.create_repo_group(repo_group_basename, parent_group_id=top_group)
 
        Session().commit()
 
        RepoGroupModel().grant_user_permission(repo_group_name,
 
                                               self.TEST_USER_LOGIN,
 
                                               'group.none')
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        # Current result when API access control is different from Web:
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
        # Expected and arguably more correct result:
 
        #expected = 'failed to create repository `%s`' % repo_name
 
        #self._compare_error(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo_group(repo_group_name)
 

	
 
    def test_api_create_repo_unknown_owner(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=owner,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dont_specify_owner(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin_specify_owner(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
                                  owner=owner)
 
        response = api_call(self, params)
 

	
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dot_dot(self):
 
        # it is only possible to create repositories in existing repo groups - and '..' can't be used
 
        group_name = '%s/..' % TEST_REPO_GROUP
 
        repo_name = '%s/%s' % (group_name, 'could-be-outside')
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = u'repo group `%s` not found' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'create', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('changing_attr,updates', [
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': u'new description'}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
 
        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': u'new_repo_name'}),
 
        ('repo_group', {'group': u'test_group_for_update'}),
 
    ])
 
    def test_api_update_repo(self, changing_attr, updates):
 
        repo_name = u'api_update_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = updates['name']
 
        if changing_attr == 'repo_group':
 
            repo_name = u'/'.join([updates['group'], repo_name])
 
        try:
 
            if changing_attr == 'clone_uri' and updates['clone_uri']:
 
                expected = u'failed to update repo `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
            else:
 
                expected = {
 
                    'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                    'repository': repo.get_api_data()
 
                }
 
                self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 

	
 
    @parametrize('changing_attr,updates', [
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': u'new description'}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
 
        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': u'new_repo_name'}),
 
        ('repo_group', {'group': u'test_group_for_update'}),
 
    ])
 
    def test_api_update_group_repo(self, changing_attr, updates):
 
        group_name = u'lololo'
 
        fixture.create_repo_group(group_name)
 
        repo_name = u'%s/api_update_me' % group_name
 
        repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = u'%s/%s' % (group_name, updates['name'])
 
        if changing_attr == 'repo_group':
 
            repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
 
        try:
 
            if changing_attr == 'clone_uri' and updates['clone_uri']:
 
                expected = u'failed to update repo `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
            else:
 
                expected = {
 
                    'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                    'repository': repo.get_api_data()
 
                }
 
                self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 
        fixture.destroy_repo_group(group_name)
 

	
 
    def test_api_update_repo_repo_group_does_not_exist(self):
 
        repo_name = u'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'group': 'test_group_for_update'}
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository group `%s` does not exist' % updates['group']
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_not_allowed(self):
 
        repo_name = u'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'description': 'something else'}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository `%s` does not exist' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'update', crash)
 
    def test_api_update_repo_exception_occurred(self):
 
        repo_name = u'api_update_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update repo `%s`' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name(self):
 
        repo_name = u'admin_owned'
 
        new_repo_name = u'new_repo_name'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'no permission to create (or move) repositories'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name_allowed(self):
 
        repo_name = u'admin_owned'
 
        new_repo_name = u'new_repo_name'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.none')
 
        UserModel().grant_perm('default', 'hg.create.repository')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_owner(self):
 
        repo_name = u'admin_owned'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        updates = {'owner': TEST_USER_ADMIN_LOGIN}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'Only Kallithea admin can specify `owner` param'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo(self):
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        id_, params = _build_data(self.apikey, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin(self):
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin_no_permission(self):
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name, )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (repo_name)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_exception_occurred(self):
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            with mock.patch.object(RepoModel, 'delete', crash):
 
                id_, params = _build_data(self.apikey, 'delete_repo',
 
                                          repoid=repo_name, )
 
                response = api_call(self, params)
 

	
 
                expected = 'failed to delete repository `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    @parametrize('fork_name', [
 
        u'api-repo-fork',
 
        u'%s/api-repo-fork' % TEST_REPO_GROUP,
 
    ])
 
    def test_api_fork_repo_non_admin(self, fork_name):
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_no_permission_to_fork(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          user=self.default_user_username,
 
                                          perm='repository.none')
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 
        try:
 
            fork_name = u'api-repo-fork'
 
            id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
            )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (self.REPO)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().grant_user_permission(repo=self.REPO,
 
                                              user=self.default_user_username,
 
                                              perm='repository.read')
 
            fixture.destroy_repo(fork_name)
 

	
 
    @parametrize('name,perm', [
 
        ('read', 'repository.read'),
 
        ('write', 'repository.write'),
 
        ('admin', 'repository.admin'),
 
    ])
 
    def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
 
        fork_name = u'api-repo-fork'
 
        # regardless of base repository permission, forking is disallowed
 
        # when repository creation is disabled
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=perm)
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'no permission to create repositories'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = u'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=owner,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_fork_repo_fork_exists(self):
 
        fork_name = u'api-repo-fork'
 
        fixture.create_fork(self.REPO, fork_name)
 

	
 
        try:
 
            fork_name = u'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
                                      owner=TEST_USER_ADMIN_LOGIN,
 
            )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_group(self):
 
        id_, params = _build_data(self.apikey, 'get_user_group',
 
                                  usergroupid=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        user_group = UserGroupModel().get_group(TEST_USER_GROUP)
 
        members = []
 
        for user in user_group.members:
 
            user = user.user
 
            members.append(user.get_api_data())
 

	
 
        ret = user_group.get_api_data()
 
        ret['members'] = members
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_groups(self):
 
        gr_name = u'test_user_group2'
 
        make_user_group(gr_name)
 

	
 
        try:
 
            id_, params = _build_data(self.apikey, 'get_user_groups', )
 
            response = api_call(self, params)
 

	
 
            expected = []
 
            for gr_name in [TEST_USER_GROUP, u'test_user_group2']:
 
                user_group = UserGroupModel().get_group(gr_name)
 
                ret = user_group.get_api_data()
 
                expected.append(ret)
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_create_user_group(self):
 
        group_name = u'some_new_group'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'created new user group `%s`' % group_name,
 
            'user_group': jsonify(UserGroupModel() \
 
                .get_by_name(group_name) \
 
                .get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_user_group(group_name)
 

	
 
    def test_api_get_user_group_that_exist(self):
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        expected = "user group `%s` already exist" % TEST_USER_GROUP
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'create', crash)
 
    def test_api_get_user_group_exception_occurred(self):
 
        group_name = u'exception_happens'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to create group `%s`' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('changing_attr,updates', [
 
        ('group_name', {'group_name': u'new_group_name'}),
 
        ('group_name', {'group_name': u'test_group_for_update'}),
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('active', {'active': False}),
 
        ('active', {'active': True}),
 
    ])
 
    def test_api_update_user_group(self, changing_attr, updates):
 
        gr_name = u'test_group_for_update'
 
        user_group = fixture.create_user_group(gr_name)
 
        try:
 
            id_, params = _build_data(self.apikey, 'update_user_group',
 
                                      usergroupid=gr_name, **updates)
 
            response = api_call(self, params)
 
            expected = {
 
               'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
 
                                                     user_group.users_group_name),
 
               'user_group': user_group.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if changing_attr == 'group_name':
 
                # switch to updated name for proper cleanup
 
                gr_name = updates['group_name']
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'update', crash)
 
    def test_api_update_user_group_exception_occurred(self):
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        try:
 
            id_, params = _build_data(self.apikey, 'update_user_group',
 
                                      usergroupid=gr_name)
 
            response = api_call(self, params)
 
            expected = 'failed to update user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group(self):
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        try:
 
            id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = {
 
            'msg': 'added member `%s` to user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name),
 
            'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group_that_doesnt_exist(self):
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid='false-group',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'user group `%s` does not exist' % 'false-group'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
 
    def test_api_add_user_to_user_group_exception_occurred(self):
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        try:
 
            id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = 'failed to add member to user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_remove_user_from_user_group(self):
 
        gr_name = u'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        try:
 
            id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = {
 
                'msg': 'removed member `%s` from user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name
 
                ),
 
                'success': True}
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
 
    def test_api_remove_user_from_user_group_exception_occurred(self):
 
        gr_name = u'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        try:
 
            id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = 'failed to remove member from user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group(self):
 
        gr_name = u'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        try:
 
            id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                      usergroupid=gr_name)
 
            response = api_call(self, params)
 
            expected = {
 
                'user_group': None,
 
                'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_that_is_assigned(self):
 
        gr_name = u'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 

	
 
        ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
 
        msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name
 

	
 
        try:
 
            id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                      usergroupid=gr_name)
 
            response = api_call(self, params)
 
            expected = msg
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_exception_occurred(self):
 
        gr_name = u'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name)
 

	
 
        try:
 
            with mock.patch.object(UserGroupModel, 'delete', crash):
 
                response = api_call(self, params)
 
                expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @parametrize('name,perm', [
 
        ('none', 'repository.none'),
 
        ('read', 'repository.read'),
 
        ('write', 'repository.write'),
 
        ('admin', 'repository.admin'),
 
    ])
 
    def test_api_grant_user_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                perm, TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_user_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_user_permission', crash)
 
    def test_api_grant_user_permission_exception_when_adding(self):
 
        perm = 'repository.read'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_revoke_user_permission(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
 
                TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
 
    def test_api_revoke_user_permission_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
kallithea/tests/functional/test_forks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import urllib
 
import unittest
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.model.db import Repository, User
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 

	
 
class _BaseTestCase(TestController):
 
    """
 
    Write all tests here
 
    """
 
    REPO = None
 
    REPO_TYPE = None
 
    NEW_REPO = None
 
    REPO_FORK = None
 

	
 
    def setup_method(self, method):
 
        self.username = u'forkuser'
 
        self.password = u'qweqwe'
 
        u1 = fixture.create_user(self.username, password=self.password, email=u'fork_king@example.com')
 
        self.u1_id = u1.user_id
 
        Session().commit()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_user(self.u1_id)
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        repo_name = self.REPO
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain("""There are no forks yet""")
 

	
 
    def test_no_permissions_to_fork(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)['user_id']
 
        try:
 
            user_model = UserModel()
 
            usr = User.get_default_user()
 
            user_model.revoke_perm(usr, 'hg.fork.repository')
 
            user_model.grant_perm(usr, 'hg.fork.none')
 
            Session().commit()
 
            # try create a fork
 
            repo_name = self.REPO
 
            self.app.post(url(controller='forks', action='fork_create',
 
                              repo_name=repo_name), {'_authentication_token': self.authentication_token()}, status=403)
 
        finally:
 
            usr = User.get_default_user()
 
            user_model.revoke_perm(usr, 'hg.fork.none')
 
            user_model.grant_perm(usr, 'hg.fork.repository')
 
            Session().commit()
 

	
 
    def test_index_with_fork(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = self.REPO_FORK
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': description,
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 

	
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        # remove this fork
 
        response = self.app.post(url('delete_repo', repo_name=fork_name),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
    def test_fork_create_into_group(self):
 
        self.log_user()
 
        group = fixture.create_repo_group(u'vc')
 
        group_id = group.group_id
 
        fork_name = self.REPO_FORK
 
        fork_name_full = 'vc/%s' % fork_name
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': group_id,
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': description,
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 
        repo = Repository.get_by_repo_name(fork_name_full)
 
        assert repo.fork.repo_name == self.REPO
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=fork_name_full))
 
        # test if we have a message that fork is ok
 
        self.checkSessionFlash(response,
 
                'Forked repository %s as <a href="/%s">%s</a>'
 
                % (repo_name, fork_name_full, fork_name_full))
 

	
 
        # test if the fork was created in the database
 
        fork_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == fork_name_full).one()
 

	
 
        assert fork_repo.repo_name == fork_name_full
 
        assert fork_repo.fork.repo_name == repo_name
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=fork_name_full))
 
        response.mustcontain(fork_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 
        response.mustcontain('Fork of "<a href="/%s">%s</a>"' % (repo_name, repo_name))
 

	
 
        fixture.destroy_repo(fork_name_full)
 
        fixture.destroy_repo_group(group_id)
 

	
 
    def test_fork_unicode(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        fork_name = safe_str(self.REPO_FORK + u'-rødgrød')
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': 'unicode repo 1',
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (urllib.quote(fork_name), fork_name)
 
        )
 
        fork_repo = Repository.get_by_repo_name(safe_unicode(fork_name))
 
        assert fork_repo
 

	
 
        # fork the fork
 
        fork_name_2 = safe_str(self.REPO_FORK + u'-blåbærgrød')
 
        creation_args = {
 
            'repo_name': fork_name_2,
 
            'repo_group': u'-1',
 
            'fork_parent_id': fork_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': 'unicode repo 2',
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=fork_name), creation_args)
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=fork_name))
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (urllib.quote(fork_name_2), fork_name_2)
 
        )
 

	
 
        # remove these forks
 
        response = self.app.post(url('delete_repo', repo_name=fork_name_2),
 
            params={'_authentication_token': self.authentication_token()})
 
        response = self.app.post(url('delete_repo', repo_name=fork_name),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
    def test_fork_create_and_permissions(self):
 
        self.log_user()
 
        fork_name = self.REPO_FORK
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': description,
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 
        repo = Repository.get_by_repo_name(self.REPO_FORK)
 
        assert repo.fork.repo_name == self.REPO
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=fork_name))
 
        # test if we have a message that fork is ok
 
        self.checkSessionFlash(response,
 
                'Forked repository %s as <a href="/%s">%s</a>'
 
                % (repo_name, fork_name, fork_name))
 

	
 
        # test if the fork was created in the database
 
        fork_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == fork_name).one()
 

	
 
        assert fork_repo.repo_name == fork_name
 
        assert fork_repo.fork.repo_name == repo_name
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=fork_name))
 
        response.mustcontain(fork_name)
 
        response.mustcontain(self.REPO_TYPE)
 
        response.mustcontain('Fork of "<a href="/%s">%s</a>"' % (repo_name, repo_name))
 

	
 
        usr = self.log_user(self.username, self.password)['user_id']
 

	
 
        forks = Repository.query() \
 
            .filter(Repository.repo_type == self.REPO_TYPE) \
 
            .filter(Repository.fork_id != None).all()
 
        assert 1 == len(forks)
 

	
 
        # set read permissions for this
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr,
 
                                          perm='repository.read')
 
        Session().commit()
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain('<div>fork of vcs test</div>')
 

	
 
        # remove permissions
 
        default_user = User.get_default_user()
 
        try:
 
            RepoModel().grant_user_permission(repo=forks[0],
 
                                              user=usr, perm='repository.none')
 
            RepoModel().grant_user_permission(repo=forks[0],
 
                                              user=default_user, perm='repository.none')
 
            Session().commit()
 

	
 
            # fork shouldn't be visible
 
            response = self.app.get(url(controller='forks', action='forks',
 
                                        repo_name=repo_name))
 
            response.mustcontain('There are no forks yet')
 

	
 
        finally:
 
            RepoModel().grant_user_permission(repo=forks[0],
 
                                              user=usr, perm='repository.read')
 
            RepoModel().grant_user_permission(repo=forks[0],
 
                                              user=default_user, perm='repository.read')
 
            RepoModel().delete(repo=forks[0])
 

	
 

	
 
class TestGIT(_BaseTestCase):
 
    REPO = GIT_REPO
 
    NEW_REPO = NEW_GIT_REPO
 
    REPO_TYPE = 'git'
 
    REPO_FORK = GIT_FORK
 

	
 

	
 
class TestHG(_BaseTestCase):
 
    REPO = HG_REPO
 
    NEW_REPO = NEW_HG_REPO
 
    REPO_TYPE = 'hg'
 
    REPO_FORK = HG_FORK
kallithea/tests/models/test_permissions.py
Show inline comments
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.db import RepoGroup, User, UserGroupRepoGroupToPerm, \
 
    Permission, UserToPerm
 
from kallithea.model.user import UserModel
 

	
 
from kallithea.model.meta import Session
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.permission import PermissionModel
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestPermissions(TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        # recreate default user to get a clean start
 
        PermissionModel().create_default_permissions(user=User.DEFAULT_USER,
 
                                                     force=True)
 
        Session().commit()
 

	
 
    def setup_method(self, method):
 
        self.u1 = UserModel().create_or_update(
 
            username=u'u1', password=u'qweqwe',
 
            email=u'u1@example.com', firstname=u'u1', lastname=u'u1'
 
        )
 
        self.u2 = UserModel().create_or_update(
 
            username=u'u2', password=u'qweqwe',
 
            email=u'u2@example.com', firstname=u'u2', lastname=u'u2'
 
        )
 
        self.u3 = UserModel().create_or_update(
 
            username=u'u3', password=u'qweqwe',
 
            email=u'u3@example.com', firstname=u'u3', lastname=u'u3'
 
        )
 
        self.anon = User.get_default_user()
 
        self.a1 = UserModel().create_or_update(
 
            username=u'a1', password=u'qweqwe',
 
            email=u'a1@example.com', firstname=u'a1', lastname=u'a1', admin=True
 
        )
 
        Session().commit()
 

	
 
    def teardown_method(self, method):
 
        if hasattr(self, 'test_repo'):
 
            RepoModel().delete(repo=self.test_repo)
 

	
 
        UserModel().delete(self.u1)
 
        UserModel().delete(self.u2)
 
        UserModel().delete(self.u3)
 
        UserModel().delete(self.a1)
 

	
 
        Session().commit() # commit early to avoid SQLAlchemy warning from double cascade delete to users_groups_members
 

	
 
        if hasattr(self, 'g1'):
 
            RepoGroupModel().delete(self.g1.group_id)
 
        if hasattr(self, 'g2'):
 
            RepoGroupModel().delete(self.g2.group_id)
 

	
 
        if hasattr(self, 'ug2'):
 
            UserGroupModel().delete(self.ug2, force=True)
 
        if hasattr(self, 'ug1'):
 
            UserGroupModel().delete(self.ug1, force=True)
 

	
 
        Session().commit()
 

	
 
    def test_default_perms_set(self):
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        assert u1_auth.permissions['repositories'][HG_REPO] == perms['repositories'][HG_REPO]
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm)
 
        Session().commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][HG_REPO] == new_perm
 

	
 
    def test_default_admin_perms_set(self):
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.admin', 'hg.create.write_on_repogroup.true']),
 
            'repositories': {HG_REPO: 'repository.admin'}
 
        }
 
        assert a1_auth.permissions['repositories'][HG_REPO] == perms['repositories'][HG_REPO]
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
 
                                          perm=new_perm)
 
        Session().commit()
 
        # cannot really downgrade admins permissions !? they still gets set as
 
        # admin !
 
        u1_auth = AuthUser(user_id=self.a1.user_id)
 
        assert u1_auth.permissions['repositories'][HG_REPO] == perms['repositories'][HG_REPO]
 

	
 
    def test_default_group_perms(self):
 
        self.g1 = fixture.create_repo_group(u'test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'test2', skip_if_exists=True)
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
 
            'global': set(Permission.DEFAULT_USER_PERMISSIONS),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        assert u1_auth.permissions['repositories'][HG_REPO] == perms['repositories'][HG_REPO]
 
        assert u1_auth.permissions['repositories_groups'] == perms['repositories_groups']
 
        assert u1_auth.permissions['global'] == perms['global']
 

	
 
    def test_default_admin_group_perms(self):
 
        self.g1 = fixture.create_repo_group(u'test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'test2', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        perms = {
 
            'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
 
            'global': set(['hg.admin', 'hg.create.write_on_repogroup.true']),
 
            'repositories': {HG_REPO: 'repository.admin'}
 
        }
 

	
 
        assert a1_auth.permissions['repositories'][HG_REPO] == perms['repositories'][HG_REPO]
 
        assert a1_auth.permissions['repositories_groups'] == perms['repositories_groups']
 

	
 
    def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm = 'repository.none'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
 
        # set user permission none
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm='repository.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][HG_REPO] == new_perm
 
        assert u1_auth.permissions['repositories'][HG_REPO] == 'repository.read' # inherit from default user
 

	
 
        # grant perm for group this should not override permission from user
 
        # since it has explicitly set
 
        new_perm_gr = 'repository.write'
 
        # grant perm for group this should override permission from user
 
        RepoModel().grant_user_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_gr)
 
        # check perms
 
                                                 perm='repository.write')
 

	
 
        # verify that user group permissions win
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        assert u1_auth.permissions['repositories'][HG_REPO] == new_perm
 
        assert u1_auth.permissions['repositories_groups'] == perms['repositories_groups']
 
        assert u1_auth.permissions['repositories'][HG_REPO] == 'repository.write'
 

	
 
    def test_propagated_permission_from_users_group(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u3)
 

	
 
        # grant perm for group this should override default permission from user
 
        new_perm_gr = 'repository.write'
 
        RepoModel().grant_user_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_gr)
 
        # check perms
 
        u3_auth = AuthUser(user_id=self.u3.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        assert u3_auth.permissions['repositories'][HG_REPO] == new_perm_gr
 
        assert u3_auth.permissions['repositories_groups'] == perms['repositories_groups']
 

	
 
    def test_propagated_permission_from_users_group_lower_weight(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        # add user to group
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm_h = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm_h)
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][HG_REPO] == new_perm_h
 

	
 
        # grant perm for group this should NOT override permission from user
 
        # since it's lower than granted
 
        new_perm_l = 'repository.read'
 
        RepoModel().grant_user_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_l)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.write'}
 
        }
 
        assert u1_auth.permissions['repositories'][HG_REPO] == new_perm_h
 
        assert u1_auth.permissions['repositories_groups'] == perms['repositories_groups']
 

	
 
    def test_repo_in_group_permissions(self):
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'group2', skip_if_exists=True)
 
        # both perms should be read !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.read', u'group2': u'group.read'}
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.read', u'group2': u'group.read'}
 

	
 
        # Change perms to none for both groups
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
 
                                               user=self.anon,
 
                                               perm='group.none')
 
        RepoGroupModel().grant_user_permission(repo_group=self.g2,
 
                                               user=self.anon,
 
                                               perm='group.none')
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.none', u'group2': u'group.none'}
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.none', u'group2': u'group.none'}
 

	
 
        # add repo to group
 
        name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
 
        self.test_repo = fixture.create_repo(name=name,
 
                                             repo_type='hg',
 
                                             repo_group=self.g1,
 
                                             cur_user=self.u1,)
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.none', u'group2': u'group.none'}
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.none', u'group2': u'group.none'}
 

	
 
        # grant permission for u2 !
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1, user=self.u2,
 
                                               perm='group.read')
 
        RepoGroupModel().grant_user_permission(repo_group=self.g2, user=self.u2,
 
                                               perm='group.read')
 
        Session().commit()
 
        assert self.u1 != self.u2
 
        # u1 and anon should have not change perms while u2 should !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.none', u'group2': u'group.none'}
 

	
 
        u2_auth = AuthUser(user_id=self.u2.user_id)
 
        assert u2_auth.permissions['repositories_groups'] == {u'group1': u'group.read', u'group2': u'group.read'}
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.none', u'group2': u'group.none'}
 

	
 
    def test_repo_group_user_as_user_group_member(self):
 
        # create Group1
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.read'}
 

	
 
        # set default permission to none
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
 
                                               user=self.anon,
 
                                               perm='group.none')
 
        # make group
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        # add user to group
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        Session().commit()
 

	
 
        # check if user is in the group
 
        members = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
 
        assert members == [self.u1.user_id]
 
        # add some user to that group
 

	
 
        # check his permissions
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.none'}
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.none'}
 

	
 
        # grant ug1 read permissions for
 
        RepoGroupModel().grant_user_group_permission(repo_group=self.g1,
 
                                                      group_name=self.ug1,
 
                                                      perm='group.read')
 
        Session().commit()
 
        # check if the
 
        obj = Session().query(UserGroupRepoGroupToPerm) \
 
            .filter(UserGroupRepoGroupToPerm.group == self.g1) \
 
            .filter(UserGroupRepoGroupToPerm.users_group == self.ug1) \
 
            .scalar()
 
        assert obj.permission.permission_name == 'group.read'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.none'}
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.read'}
 

	
 
    def test_inherit_nice_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inherit_sad_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inherit_more_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        # disable global perms on specific user
 
        user_model.revoke_perm(self.u1, 'hg.create.repository')
 
        user_model.grant_perm(self.u1, 'hg.create.none')
 
        user_model.revoke_perm(self.u1, 'hg.fork.repository')
 
        user_model.grant_perm(self.u1, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited more permissions from default user
 
        assert u1_auth.permissions['global'] == set([
 
                              'hg.create.repository',
 
                              'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inherit_less_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        # enable global perms on specific user
 
        user_model.revoke_perm(self.u1, 'hg.create.none')
 
        user_model.grant_perm(self.u1, 'hg.create.repository')
 
        user_model.revoke_perm(self.u1, 'hg.fork.none')
 
        user_model.grant_perm(self.u1, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited less permissions from default user
 
        assert u1_auth.permissions['global'] == set([
 
                              'hg.create.repository',
 
                              'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions(self):
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and and verify it really is inactive.
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # enable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.repository')
 

	
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions_inverse(self):
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and and verify it really is inactive.
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # disable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.none')
 

	
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable admin access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.admin')
 
        # enable only write access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.write'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable only write access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.write')
 
        # enable admin access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable admin access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.admin')
 
        # enable only write access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.write'}
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable only write access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
0 comments (0 inline, 0 general)