Changeset - 93834966ae01
[Not reviewed]
default
0 11 0
Mads Kiilerich - 7 years ago 2018-12-31 02:25:11
mads@kiilerich.com
auth: global permissions given to the default user are the bare minimum and should apply to *all* other users too

Drop the "subtractive permission" config option "inherit_from_default" that
when set to false would give users less global permissions than the default
unauthenticated user.

Instead, think positive and merge all positive permissions.

At the end, filter the global permissions to make sure we for each kind of
permissions only keep the one with most weight.
11 files changed with 31 insertions and 116 deletions:
0 comments (0 inline, 0 general)
kallithea/config/rcextensions/__init__.py
Show inline comments
 
@@ -60,161 +60,159 @@ CREATE_REPO_HOOK = _crrepohook
 
# this function will be executed before each user is created
 
def _pre_cruserhook(*args, **kwargs):
 
    """
 
    Pre create user HOOK, it returns a tuple of bool, reason.
 
    If bool is False the user creation will be stopped and reason
 
    will be displayed to the user.
 
    kwargs available:
 
    :param username:
 
    :param password:
 
    :param email:
 
    :param firstname:
 
    :param lastname:
 
    :param active:
 
    :param admin:
 
    :param created_by:
 
    """
 
    reason = 'allowed'
 
    return True, reason
 

	
 

	
 
PRE_CREATE_USER_HOOK = _pre_cruserhook
 

	
 
#==============================================================================
 
# POST CREATE USER HOOK
 
#==============================================================================
 
# this function will be executed after each user is created
 
def _cruserhook(*args, **kwargs):
 
    """
 
    Post create user HOOK
 
    kwargs available:
 
      :param username:
 
      :param full_name_or_username:
 
      :param full_contact:
 
      :param user_id:
 
      :param name:
 
      :param firstname:
 
      :param short_contact:
 
      :param admin:
 
      :param lastname:
 
      :param ip_addresses:
 
      :param ldap_dn:
 
      :param email:
 
      :param api_key:
 
      :param last_login:
 
      :param full_name:
 
      :param active:
 
      :param password:
 
      :param emails:
 
      :param inherit_default_permissions:
 
      :param created_by:
 
    """
 
    return 0
 

	
 

	
 
CREATE_USER_HOOK = _cruserhook
 

	
 

	
 
#==============================================================================
 
# POST DELETE REPOSITORY HOOK
 
#==============================================================================
 
# this function will be executed after each repository deletion
 
def _dlrepohook(*args, **kwargs):
 
    """
 
    Post delete repository HOOK
 
    kwargs available:
 
     :param repo_name:
 
     :param repo_type:
 
     :param description:
 
     :param private:
 
     :param created_on:
 
     :param enable_downloads:
 
     :param repo_id:
 
     :param owner_id:
 
     :param enable_statistics:
 
     :param clone_uri:
 
     :param fork_id:
 
     :param group_id:
 
     :param deleted_by:
 
     :param deleted_on:
 
    """
 
    return 0
 

	
 

	
 
DELETE_REPO_HOOK = _dlrepohook
 

	
 

	
 
#==============================================================================
 
# POST DELETE USER HOOK
 
#==============================================================================
 
# this function will be executed after each user is deleted
 
def _dluserhook(*args, **kwargs):
 
    """
 
    Post delete user HOOK
 
    kwargs available:
 
      :param username:
 
      :param full_name_or_username:
 
      :param full_contact:
 
      :param user_id:
 
      :param name:
 
      :param firstname:
 
      :param short_contact:
 
      :param admin:
 
      :param lastname:
 
      :param ip_addresses:
 
      :param ldap_dn:
 
      :param email:
 
      :param api_key:
 
      :param last_login:
 
      :param full_name:
 
      :param active:
 
      :param password:
 
      :param emails:
 
      :param inherit_default_permissions:
 
      :param deleted_by:
 
    """
 
    return 0
 

	
 

	
 
DELETE_USER_HOOK = _dluserhook
 

	
 

	
 
#==============================================================================
 
# POST PUSH HOOK
 
#==============================================================================
 

	
 
# this function will be executed after each push it's executed after the
 
# build-in hook that Kallithea uses for logging pushes
 
def _pushhook(*args, **kwargs):
 
    """
 
    Post push hook
 
    kwargs available:
 

	
 
      :param server_url: url of instance that triggered this hook
 
      :param config: path to .ini config used
 
      :param scm: type of VS 'git' or 'hg'
 
      :param username: name of user who pushed
 
      :param ip: ip of who pushed
 
      :param action: push
 
      :param repository: repository name
 
      :param pushed_revs: list of pushed revisions
 
    """
 
    return 0
 

	
 

	
 
PUSH_HOOK = _pushhook
 

	
 

	
 
#==============================================================================
 
# POST PULL HOOK
 
#==============================================================================
 

	
 
# this function will be executed after each push it's executed after the
 
# build-in hook that Kallithea uses for logging pulls
 
def _pullhook(*args, **kwargs):
 
    """
 
    Post pull hook
 
    kwargs available::
 

	
 
      :param server_url: url of instance that triggered this hook
 
      :param config: path to .ini config used
 
      :param scm: type of VS 'git' or 'hg'
kallithea/controllers/admin/user_groups.py
Show inline comments
 
@@ -324,97 +324,95 @@ class UserGroupsController(BaseControlle
 
        ugroup_repo_perms = UserGroupRepoToPerm.query() \
 
            .options(joinedload(UserGroupRepoToPerm.permission)) \
 
            .options(joinedload(UserGroupRepoToPerm.repository)) \
 
            .filter(UserGroupRepoToPerm.users_group_id == id) \
 
            .all()
 

	
 
        for gr in ugroup_repo_perms:
 
            permissions['repositories'][gr.repository.repo_name]  \
 
                = gr.permission.permission_name
 

	
 
        ugroup_group_perms = UserGroupRepoGroupToPerm.query() \
 
            .options(joinedload(UserGroupRepoGroupToPerm.permission)) \
 
            .options(joinedload(UserGroupRepoGroupToPerm.group)) \
 
            .filter(UserGroupRepoGroupToPerm.users_group_id == id) \
 
            .all()
 

	
 
        for gr in ugroup_group_perms:
 
            permissions['repositories_groups'][gr.group.group_name] \
 
                = gr.permission.permission_name
 
        c.permissions = permissions
 

	
 
        ug_model = UserGroupModel()
 

	
 
        defaults = c.user_group.get_dict()
 
        defaults.update({
 
            'create_repo_perm': ug_model.has_perm(c.user_group,
 
                                                  'hg.create.repository'),
 
            'create_user_group_perm': ug_model.has_perm(c.user_group,
 
                                                        'hg.usergroup.create.true'),
 
            'fork_repo_perm': ug_model.has_perm(c.user_group,
 
                                                'hg.fork.repository'),
 
        })
 

	
 
        return htmlfill.render(
 
            render('admin/user_groups/user_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def update_default_perms(self, id):
 
        user_group = UserGroup.get_or_404(id)
 

	
 
        try:
 
            form = CustomDefaultPermissionsForm()()
 
            form_result = form.to_python(request.POST)
 

	
 
            inherit_perms = form_result['inherit_default_permissions']
 
            user_group.inherit_default_permissions = inherit_perms
 
            usergroup_model = UserGroupModel()
 

	
 
            defs = UserGroupToPerm.query() \
 
                .filter(UserGroupToPerm.users_group == user_group) \
 
                .all()
 
            for ug in defs:
 
                Session().delete(ug)
 

	
 
            if form_result['create_repo_perm']:
 
                usergroup_model.grant_perm(id, 'hg.create.repository')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.create.none')
 
            if form_result['create_user_group_perm']:
 
                usergroup_model.grant_perm(id, 'hg.usergroup.create.true')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.usergroup.create.false')
 
            if form_result['fork_repo_perm']:
 
                usergroup_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.fork.none')
 

	
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during permissions saving'),
 
                    category='error')
 

	
 
        raise HTTPFound(location=url('edit_user_group_default_perms', id=id))
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def edit_advanced(self, id):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'advanced'
 
        c.group_members_obj = sorted((x.user for x in c.user_group.members),
 
                                     key=lambda u: u.username.lower())
 
        return render('admin/user_groups/user_group_edit.html')
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def edit_members(self, id):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'members'
 
        c.group_members_obj = sorted((x.user for x in c.user_group.members),
 
                                     key=lambda u: u.username.lower())
 

	
 
        c.group_members = [(x.user_id, x.username) for x in c.group_members_obj]
 
        return render('admin/user_groups/user_group_edit.html')
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -270,165 +270,162 @@ class UsersController(BaseController):
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def delete_api_key(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            c.user.api_key = generate_api_key()
 
            Session().commit()
 
            h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, c.user.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def update_account(self, id):
 
        pass
 

	
 
    def edit_perms(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'perms'
 
        c.perm_user = AuthUser(dbuser=c.user)
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def update_perms(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 

	
 
        try:
 
            form = CustomDefaultPermissionsForm()()
 
            form_result = form.to_python(request.POST)
 

	
 
            inherit_perms = form_result['inherit_default_permissions']
 
            user.inherit_default_permissions = inherit_perms
 
            user_model = UserModel()
 

	
 
            defs = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
            for ug in defs:
 
                Session().delete(ug)
 

	
 
            if form_result['create_repo_perm']:
 
                user_model.grant_perm(id, 'hg.create.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.create.none')
 
            if form_result['create_user_group_perm']:
 
                user_model.grant_perm(id, 'hg.usergroup.create.true')
 
            else:
 
                user_model.grant_perm(id, 'hg.usergroup.create.false')
 
            if form_result['fork_repo_perm']:
 
                user_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.fork.none')
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during permissions saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_perms', id=id))
 

	
 
    def edit_emails(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'emails'
 
        c.user_email_map = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == c.user).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email = request.POST.get('new_email')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_email(id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def delete_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def edit_ips(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ips'
 
        c.user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == c.user).all()
 

	
 
        c.inherit_default_ips = c.user.inherit_default_permissions
 
        c.default_user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == User.get_default_user()).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_ip(self, id):
 
        ip = request.POST.get('new_ip')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_ip(id, ip)
 
            Session().commit()
 
            h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['ip']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred while adding IP address'),
 
                    category='error')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    def delete_ip(self, id):
 
        ip_id = request.POST.get('del_ip_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_ip(id, ip_id)
 
        Session().commit()
 
        h.flash(_("Removed IP address from user whitelist"), category='success')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
kallithea/lib/auth.py
Show inline comments
 
@@ -87,241 +87,228 @@ class PasswordGenerator(object):
 
        while len(l) < length:
 
            i = ord(os.urandom(1))
 
            if i < len(alphabet):
 
                l.append(alphabet[i])
 
        return ''.join(l)
 

	
 

	
 
def get_crypt_password(password):
 
    """
 
    Cryptographic function used for password hashing based on pybcrypt
 
    or Python's own OpenSSL wrapper on windows
 

	
 
    :param password: password to hash
 
    """
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest()
 
    elif is_unix:
 
        import bcrypt
 
        return bcrypt.hashpw(safe_str(password), bcrypt.gensalt(10))
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def check_password(password, hashed):
 
    """
 
    Checks matching password with it's hashed value, runs different
 
    implementation based on platform it runs on
 

	
 
    :param password: password
 
    :param hashed: password in hashed form
 
    """
 

	
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest() == hashed
 
    elif is_unix:
 
        import bcrypt
 
        try:
 
            return bcrypt.checkpw(safe_str(password), safe_str(hashed))
 
        except ValueError as e:
 
            # bcrypt will throw ValueError 'Invalid hashed_password salt' on all password errors
 
            log.error('error from bcrypt checking password: %s', e)
 
            return False
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin, user_inherit_default_permissions,
 
def _cached_perms_data(user_id, user_is_admin,
 
                       explicit):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 

	
 
    def _choose_perm(new_perm, cur_perm):
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if new_perm_val > cur_perm_val:
 
            return new_perm
 
        return cur_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user = User.get_by_username('default', cache=True)
 
    default_user_id = default_user.user_id
 

	
 
    default_repo_perms = Permission.get_default_perms(default_user_id)
 
    default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 
    default_user_group_perms = Permission.get_default_user_group_perms(default_user_id)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        permissions[GLOBAL].add('hg.create.write_on_repogroup.true')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == default_user_id) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        if perm.Repository.private and not (perm.Repository.owner_id == user_id):
 
            # disable defaults for private repos,
 
            p = 'repository.none'
 
        elif perm.Repository.owner_id == user_id:
 
            # set admin if owner
 
            p = 'repository.admin'
 
        else:
 
            p = perm.Permission.permission_name
 

	
 
        permissions[RK][r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        permissions[GK][rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        permissions[UK][u_k] = p
 

	
 
    #======================================================================
 
    # !! OVERRIDE GLOBALS !! with user permissions if any found
 
    # !! Augment GLOBALS with user permissions if any found !!
 
    #======================================================================
 
    # those can be configured from groups or users explicitly
 
    _configurable = set([
 
        'hg.fork.none', 'hg.fork.repository',
 
        'hg.create.none', 'hg.create.repository',
 
        'hg.usergroup.create.false', 'hg.usergroup.create.true'
 
    ])
 

	
 
    # USER GROUPS comes first
 
    # user group global permissions
 
    user_perms_from_users_groups = Session().query(UserGroupToPerm) \
 
        .options(joinedload(UserGroupToPerm.permission)) \
 
        .join((UserGroupMember, UserGroupToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .join((UserGroup, UserGroupMember.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .order_by(UserGroupToPerm.users_group_id) \
 
        .all()
 
    # need to group here by groups since user can be in more than
 
    # one group
 
    _grouped = [[x, list(y)] for x, y in
 
                itertools.groupby(user_perms_from_users_groups,
 
                                  lambda x:x.users_group)]
 
    for gr, perms in _grouped:
 
        # since user can be in multiple groups iterate over them and
 
        # select the lowest permissions first (more explicit)
 
        # TODO: do this^^
 
        if not gr.inherit_default_permissions:
 
            # NEED TO IGNORE all configurable permissions and
 
            # replace them with explicitly set
 
            permissions[GLOBAL] = permissions[GLOBAL] \
 
                                            .difference(_configurable)
 
        for perm in perms:
 
            permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # user specific global permissions
 
    user_perms = Session().query(UserToPerm) \
 
            .options(joinedload(UserToPerm.permission)) \
 
            .filter(UserToPerm.user_id == user_id).all()
 

	
 
    if not user_inherit_default_permissions:
 
        # NEED TO IGNORE all configurable permissions and
 
        # replace them with explicitly set
 
        permissions[GLOBAL] = permissions[GLOBAL] \
 
                                        .difference(_configurable)
 

	
 
        for perm in user_perms:
 
            permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # for each kind of global permissions, only keep the one with heighest weight
 
    kind_max_perm = {}
 
    for perm in sorted(permissions[GLOBAL], key=lambda n: PERM_WEIGHTS[n]):
 
        kind = perm.rsplit('.', 1)[0]
 
        kind_max_perm[kind] = perm
 
    permissions[GLOBAL] = set(kind_max_perm.values())
 
    ## END GLOBAL PERMISSIONS
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORIES !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository and
 
    # fill in his permission from it.
 
    #======================================================================
 

	
 
    # user group for repositories permissions
 
    user_repo_perms_from_users_groups = \
 
     Session().query(UserGroupRepoToPerm, Permission, Repository,) \
 
        .join((Repository, UserGroupRepoToPerm.repository_id ==
 
               Repository.repo_id)) \
 
        .join((Permission, UserGroupRepoToPerm.permission_id ==
 
               Permission.permission_id)) \
 
        .join((UserGroup, UserGroupRepoToPerm.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .join((UserGroupMember, UserGroupRepoToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_perms_from_users_groups:
 
        r_k = perm.UserGroupRepoToPerm.repository.repo_name
 
        multiple_counter[r_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[RK][r_k]
 

	
 
        if perm.Repository.owner_id == user_id:
 
            # set admin if owner
 
            p = 'repository.admin'
 
        else:
 
            if multiple_counter[r_k] > 1:
 
                p = _choose_perm(p, cur_perm)
 
        permissions[RK][r_k] = p
 

	
 
    # user explicit permissions for repositories, overrides any specified
 
    # by the group permission
 
    user_repo_perms = Permission.get_default_perms(user_id)
 
    for perm in user_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        cur_perm = permissions[RK][r_k]
 
        # set admin if owner
 
        if perm.Repository.owner_id == user_id:
 
@@ -440,97 +427,96 @@ class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False. It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    def __init__(self, user_id=None, dbuser=None, authenticating_api_key=None,
 
            is_external_auth=False):
 

	
 
        self.is_authenticated = False
 
        self.is_external_auth = is_external_auth
 
        self.authenticating_api_key = authenticating_api_key
 

	
 
        # These attributes will be overridden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 
        self.inherit_default_permissions = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = UserModel().get(user_id)
 
        else:
 
            # Note: dbuser is allowed to be None.
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        is_user_loaded = self._fill_data(dbuser)
 

	
 
        # If user cannot be found, try falling back to anonymous.
 
        if is_user_loaded:
 
            assert dbuser is not None
 
            self.is_default_user = dbuser.is_default_user
 
        else:
 
            default_user = User.get_default_user(cache=True)
 
            is_user_loaded = self._fill_data(default_user)
 
            self.is_default_user = is_user_loaded
 

	
 
        self.is_anonymous = not is_user_loaded or self.is_default_user
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s', self)
 

	
 
    def _fill_data(self, dbuser):
 
        """
 
        Copies database fields from a `db.User` to this `AuthUser`. Does
 
        not copy `api_keys` and `permissions` attributes.
 

	
 
        Checks that `dbuser` is `active` (and not None) before copying;
 
        returns True on success.
 
        """
 
        if dbuser is not None and dbuser.active:
 
            log.debug('filling %s data', dbuser)
 
            for k, v in dbuser.get_dict().iteritems():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        return self.__get_perms(user=self, cache=False)
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
@@ -542,205 +528,198 @@ class AuthUser(object):
 
        actual_perm = self.permissions['repositories'].get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_user_group_permission_level(self, user_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
 
            'write': ['usergroup.write', 'usergroup.admin'],
 
            'admin': ['usergroup.admin'],
 
        }[level]
 
        actual_perm = self.permissions['user_groups'].get(user_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
 
            self.username, level, user_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 

	
 
    def __get_perms(self, user, explicit=True, cache=False):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: `AuthUser` instance
 
        :param explicit: In case there are permissions both for user and a group
 
            that user is part of, explicit flag will define if user will
 
            explicitly override permissions from group, if it's False it will
 
            compute the decision
 
        """
 
        user_id = user.user_id
 
        user_is_admin = user.is_admin
 
        user_inherit_default_permissions = user.inherit_default_permissions
 

	
 
        log.debug('Getting PERMISSION tree')
 
        compute = conditional_cache('short_term', 'cache_desc',
 
                                    condition=cache, func=_cached_perms_data)
 
        return compute(user_id, user_is_admin,
 
                       user_inherit_default_permissions, explicit)
 
        return compute(user_id, user_is_admin, explicit)
 

	
 
    def _get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query() \
 
                .filter_by(user_id=self.user_id, is_expired=False):
 
            api_keys.append(api_key.api_key)
 

	
 
        return api_keys
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].iteritems()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].iteritems()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['user_groups'].iteritems()
 
                if x[1] == 'usergroup.admin']
 

	
 
    @staticmethod
 
    def check_ip_allowed(user, ip_addr):
 
        """
 
        Check if the given IP address (a `str`) is allowed for the given
 
        user (an `AuthUser` or `db.User`).
 
        """
 
        allowed_ips = AuthUser.get_allowed_ips(user.user_id, cache=True,
 
            inherit_from_default=user.inherit_default_permissions)
 
        allowed_ips = AuthUser.get_allowed_ips(user.user_id, cache=True)
 
        if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
 
            log.debug('IP:%s is in range of %s', ip_addr, allowed_ips)
 
            return True
 
        else:
 
            log.info('Access for IP:%s forbidden, '
 
                     'not in %s' % (ip_addr, allowed_ips))
 
            return False
 

	
 
    def __repr__(self):
 
        return "<AuthUser('id:%s[%s] auth:%s')>" \
 
            % (self.user_id, self.username, (self.is_authenticated or self.is_default_user))
 

	
 
    def to_cookie(self):
 
        """ Serializes this login session to a cookie `dict`. """
 
        return {
 
            'user_id': self.user_id,
 
            'is_external_auth': self.is_external_auth,
 
        }
 

	
 
    @staticmethod
 
    def from_cookie(cookie):
 
        """
 
        Deserializes an `AuthUser` from a cookie `dict`.
 
        """
 

	
 
        au = AuthUser(
 
            user_id=cookie.get('user_id'),
 
            is_external_auth=cookie.get('is_external_auth', False),
 
        )
 
        au.is_authenticated = True
 
        return au
 

	
 
    @classmethod
 
    def get_allowed_ips(cls, user_id, cache=False, inherit_from_default=False):
 
    def get_allowed_ips(cls, user_id, cache=False):
 
        _set = set()
 

	
 
        if inherit_from_default:
 
            default_ips = UserIpMap.query().filter(UserIpMap.user_id ==
 
                                            User.get_default_user(cache=True).user_id)
 
            if cache:
 
                default_ips = default_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_default"))
 

	
 
            # populate from default user
 
            for ip in default_ips:
 
                try:
 
                    _set.add(ip.ip_addr)
 
                except ObjectDeletedError:
 
                    # since we use heavy caching sometimes it happens that we get
 
                    # deleted objects here, we just skip them
 
                    pass
 

	
 
        user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
 
        if cache:
 
            user_ips = user_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_%s" % user_id))
 

	
 
        for ip in user_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 
        return _set or set(['0.0.0.0/0', '::/0'])
 

	
 

	
 
def set_available_permissions(config):
 
    """
 
    This function will propagate globals with all available defined
 
    permission given in db. We don't want to check each time from db for new
 
    permissions since adding a new permission also requires application restart
 
    ie. to decorate new views with the newly created permission
 

	
 
    :param config: current config instance
 

	
 
    """
 
    log.info('getting information about all available permissions')
 
    try:
 
        all_perms = Session().query(Permission).all()
 
        config['available_permissions'] = [x.permission_name for x in all_perms]
 
    finally:
 
        Session.remove()
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def _redirect_to_login(message=None):
 
    """Return an exception that must be raised. It will redirect to the login
 
    page which will redirect back to the current URL after authentication.
 
    The optional message will be shown in a flash message."""
 
    from kallithea.lib import helpers as h
 
    if message:
 
        h.flash(message, category='warning')
 
    p = request.path_qs
 
    log.debug('Redirecting to login page, origin: %s', p)
 
    return HTTPFound(location=url('login_home', came_from=p))
 

	
 

	
 
# Use as decorator
 
class LoginRequired(object):
 
    """Client must be logged in as a valid User, or we'll redirect to the login
 
    page.
kallithea/lib/hooks.py
Show inline comments
 
@@ -167,170 +167,168 @@ def log_create_repository(repository_dic
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'CREATE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'created_by': created_by})
 
        kw.update(kwargs)
 
        return callback(**kw)
 

	
 
    return 0
 

	
 

	
 
def check_allowed_create_user(user_dict, created_by, **kwargs):
 
    # pre create hooks
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'PRE_CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        allowed, reason = callback(created_by=created_by, **user_dict)
 
        if not allowed:
 
            raise UserCreationError(reason)
 

	
 

	
 
def log_create_user(user_dict, created_by, **kwargs):
 
    """
 
    Post create user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 
     'inherit_default_permissions'
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        return callback(created_by=created_by, **user_dict)
 

	
 
    return 0
 

	
 

	
 
def log_delete_repository(repository_dict, deleted_by, **kwargs):
 
    """
 
    Post delete repository Hook.
 

	
 
    :param repository: dict dump of repository object
 
    :param deleted_by: username who deleted the repository
 

	
 
    available keys of repository_dict:
 

	
 
     'repo_type',
 
     'description',
 
     'private',
 
     'created_on',
 
     'enable_downloads',
 
     'repo_id',
 
     'owner_id',
 
     'enable_statistics',
 
     'clone_uri',
 
     'fork_id',
 
     'group_id',
 
     'repo_name'
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'DELETE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'deleted_by': deleted_by,
 
                   'deleted_on': time.time()})
 
        kw.update(kwargs)
 
        return callback(**kw)
 

	
 
    return 0
 

	
 

	
 
def log_delete_user(user_dict, deleted_by, **kwargs):
 
    """
 
    Post delete user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 
     'inherit_default_permissions'
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'DELETE_USER_HOOK', None)
 
    if callable(callback):
 
        return callback(deleted_by=deleted_by, **user_dict)
 

	
 
    return 0
 

	
 

	
 
def _hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    from paste.deploy import appconfig
 
    from sqlalchemy import engine_from_config
 
    from kallithea.config.environment import load_environment
 
    from kallithea.model.base import init_model
 

	
 
    extras = _extract_extras()
 
    ini_file_path = extras['config']
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    app_conf = appconfig('config:%s' % ini_file_path)
 
    conf = load_environment(app_conf.global_conf, app_conf.local_conf)
 

	
 
    setup_cache_regions(conf)
 

	
 
    engine = engine_from_config(conf, 'sqlalchemy.')
 
    init_model(engine)
 

	
 
    repo_path = safe_unicode(repo_path)
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database'
 
                      % (safe_str(repo_path)))
 

	
 
    baseui = make_ui()
 
    return baseui, repo
 

	
kallithea/model/db.py
Show inline comments
 
@@ -381,97 +381,96 @@ class Ui(Base, BaseDbModel):
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s[%s]%s=>%s]>' % (self.__class__.__name__, self.ui_section,
 
                                    self.ui_key, self.ui_value)
 

	
 

	
 
class User(Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    inherit_default_permissions = Column(Boolean(), nullable=False, default=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
@@ -775,97 +774,96 @@ class UserIpMap(Base, BaseDbModel):
 
        return dict(
 
          ip_addr=self.ip_addr,
 
          ip_range=self._get_ip_range(self.ip_addr)
 
        )
 

	
 
    def __unicode__(self):
 
        return u"<%s('user_id:%s=>%s')>" % (self.__class__.__name__,
 
                                            self.user_id, self.ip_addr)
 

	
 

	
 
class UserLog(Base, BaseDbModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    user_log_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    username = Column(String(255), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    repository_name = Column(Unicode(255), nullable=False)
 
    user_ip = Column(String(255), nullable=True)
 
    action = Column(UnicodeText(), nullable=False)
 
    action_date = Column(DateTime(timezone=False), nullable=False)
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.repository_name,
 
                                      self.action)
 

	
 
    @property
 
    def action_as_day(self):
 
        return datetime.date(*self.action_date.timetuple()[:3])
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository', cascade='')
 

	
 

	
 
class UserGroup(Base, BaseDbModel):
 
    __tablename__ = 'users_groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_id = Column(Integer(), primary_key=True)
 
    users_group_name = Column(Unicode(255), nullable=False, unique=True)
 
    user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
 
    users_group_active = Column(Boolean(), nullable=False)
 
    inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, default=True)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _group_data = Column("group_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    members = relationship('UserGroupMember', cascade="all, delete-orphan")
 
    users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
 
    users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    user_user_group_to_perm = relationship('UserUserGroupToPerm ', cascade='all')
 
    user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.users_group_id,
 
                                      self.users_group_name)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False,
 
                          case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.users_group_name) == func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
kallithea/model/forms.py
Show inline comments
 
@@ -379,97 +379,96 @@ def ApplicationUiSettingsForm():
 
    class _ApplicationUiSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        paths_root_path = All(
 
            v.ValidPath(),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        hooks_changegroup_update = v.StringBoolean(if_missing=False)
 
        hooks_changegroup_repo_size = v.StringBoolean(if_missing=False)
 

	
 
        extensions_largefiles = v.StringBoolean(if_missing=False)
 
        extensions_hgsubversion = v.StringBoolean(if_missing=False)
 
        extensions_hggit = v.StringBoolean(if_missing=False)
 

	
 
    return _ApplicationUiSettingsForm
 

	
 

	
 
def DefaultPermissionsForm(repo_perms_choices, group_perms_choices,
 
                           user_group_perms_choices, create_choices,
 
                           create_on_write_choices, repo_group_create_choices,
 
                           user_group_create_choices, fork_choices,
 
                           register_choices, extern_activate_choices):
 
    class _DefaultPermissionsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        overwrite_default_repo = v.StringBoolean(if_missing=False)
 
        overwrite_default_group = v.StringBoolean(if_missing=False)
 
        overwrite_default_user_group = v.StringBoolean(if_missing=False)
 
        anonymous = v.StringBoolean(if_missing=False)
 
        default_repo_perm = v.OneOf(repo_perms_choices)
 
        default_group_perm = v.OneOf(group_perms_choices)
 
        default_user_group_perm = v.OneOf(user_group_perms_choices)
 

	
 
        default_repo_create = v.OneOf(create_choices)
 
        create_on_write = v.OneOf(create_on_write_choices)
 
        default_user_group_create = v.OneOf(user_group_create_choices)
 
        #default_repo_group_create = v.OneOf(repo_group_create_choices) #not impl. yet
 
        default_fork = v.OneOf(fork_choices)
 

	
 
        default_register = v.OneOf(register_choices)
 
        default_extern_activate = v.OneOf(extern_activate_choices)
 
    return _DefaultPermissionsForm
 

	
 

	
 
def CustomDefaultPermissionsForm():
 
    class _CustomDefaultPermissionsForm(formencode.Schema):
 
        filter_extra_fields = True
 
        allow_extra_fields = True
 
        inherit_default_permissions = v.StringBoolean(if_missing=False)
 

	
 
        create_repo_perm = v.StringBoolean(if_missing=False)
 
        create_user_group_perm = v.StringBoolean(if_missing=False)
 
        #create_repo_group_perm Impl. later
 

	
 
        fork_repo_perm = v.StringBoolean(if_missing=False)
 

	
 
    return _CustomDefaultPermissionsForm
 

	
 

	
 
def DefaultsForm(edit=False, old_data=None, supported_backends=BACKENDS.keys()):
 
    class _DefaultsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        default_repo_type = v.OneOf(supported_backends)
 
        default_repo_private = v.StringBoolean(if_missing=False)
 
        default_repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        default_repo_enable_downloads = v.StringBoolean(if_missing=False)
 

	
 
    return _DefaultsForm
 

	
 

	
 
def AuthSettingsForm(current_active_modules):
 
    class _AuthSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        auth_plugins = All(v.ValidAuthPlugins(),
 
                           v.UniqueListFromString()(not_empty=True))
 

	
 
        def __init__(self, *args, **kwargs):
 
            # The auth plugins tell us what form validators they use
 
            if current_active_modules:
 
                import kallithea.lib.auth_modules
 
                from kallithea.lib.auth_modules import LazyFormencode
 
                for module in current_active_modules:
 
                    plugin = kallithea.lib.auth_modules.loadplugin(module)
 
                    plugin_name = plugin.name
 
                    for sv in plugin.plugin_settings():
 
                        newk = "auth_%s_%s" % (plugin_name, sv["name"])
 
                        # can be a LazyFormencode object from plugin settings
 
                        validator = sv["validator"]
 
                        if isinstance(validator, LazyFormencode):
 
                            validator = validator()
 
                        # init all lazy validators from formencode.All
 
                        if isinstance(validator, All):
 
                            init_validators = []
 
                            for validator in validator.validators:
 
                                if isinstance(validator, LazyFormencode):
kallithea/templates/admin/users/user_edit_ips.html
Show inline comments
 
<table class="table">
 
    %if c.default_user_ip_map and c.inherit_default_ips:
 
    %if c.default_user_ip_map:
 
        %for ip in c.default_user_ip_map:
 
          <tr>
 
            <td><div class="ip">${ip.ip_addr}</div></td>
 
            <td><div class="ip">${h.ip_range(ip.ip_addr)}</div></td>
 
            <td>${h.HTML(_('Inherited from %s')) % h.link_to('*default*',h.url('admin_permissions_ips'))}</td>
 
          </tr>
 
        %endfor
 
    %endif
 

	
 
    %if c.user_ip_map:
 
        %for ip in c.user_ip_map:
 
          <tr>
 
            <td><div class="ip">${ip.ip_addr}</div></td>
 
            <td><div class="ip">${h.ip_range(ip.ip_addr)}</div></td>
 
            <td>
 
                ${h.form(url('edit_user_ips_delete', id=c.user.user_id))}
 
                    ${h.hidden('del_ip_id',ip.ip_id)}
 
                    <i class="icon-trashcan"></i>
 
                    ${h.submit('remove_',_('Delete'),id="remove_ip_%s" % ip.ip_id,
 
                        class_="btn btn-default btn-xs", onclick="return confirm('"+_('Confirm to delete this IP address: %s') % ip.ip_addr+"');")}
 
                ${h.end_form()}
 
            </td>
 
          </tr>
 
        %endfor
 
    %endif
 
    %if not c.default_user_ip_map and not c.user_ip_map:
 
        <tr><td><div class="ip">${_('All IP addresses are allowed.')}</div></td></tr>
 
    %endif
 
</table>
 

	
 
<div>
 
    ${h.form(url('edit_user_ips_update', id=c.user.user_id))}
 
    <div class="form">
 
            <div class="form-group">
 
                <label class="control-label" for="new_ip">${_('New IP address')}:</label>
 
                <div>
 
                    ${h.text('new_ip', class_='form-control')}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save',_('Add'),class_="btn btn-default")}
 
                    ${h.reset('reset',_('Reset'),class_="btn btn-default")}
 
                </div>
 
            </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
kallithea/templates/base/default_perms_box.html
Show inline comments
 
## snippet for displaying default permission box
 
## usage:
 
##    <%namespace name="dpb" file="/base/default_perms_box.html"/>
 
##    ${dpb.default_perms_box(<url_to_form>)}
 

	
 

	
 
<%def name="default_perms_box(form_url)">
 
${h.form(form_url)}
 
    <div class="form">
 
            <div class="form-group">
 
                <label class="control-label" for="inherit_default_permissions">${_('Inherit defaults')}:</label>
 
                <div>
 
                    ${h.checkbox('inherit_default_permissions',value=True)}
 
                    <span class="help-block">
 
                        ${(h.HTML(_('Select to inherit global settings, IP whitelist and permissions from the %s.'))
 
                                % h.link_to(_('default permissions'), url('admin_permissions')))}
 
                    </span>
 
                </div>
 
            </div>
 

	
 
            <div id="inherit_overlay">
 
            <div class="form-group">
 
                <label class="control-label" for="create_repo_perm">${_('Create repositories')}:</label>
 
                <div>
 
                    ${h.checkbox('create_repo_perm',value=True)}
 
                    <span class="help-block">
 
                        ${_('Select this option to allow repository creation for this user')}
 
                    </span>
 
                </div>
 
            </div>
 

	
 
            <div class="form-group">
 
                <label class="control-label" for="create_user_group_perm">${_('Create user groups')}:</label>
 
                <div>
 
                    ${h.checkbox('create_user_group_perm',value=True)}
 
                    <span class="help-block">
 
                        ${_('Select this option to allow user group creation for this user')}
 
                    </span>
 
                </div>
 
            </div>
 

	
 
            <div class="form-group">
 
                <label class="control-label" for="fork_repo_perm">${_('Fork repositories')}:</label>
 
                <div>
 
                    ${h.checkbox('fork_repo_perm',value=True)}
 
                    <span class="help-block">
 
                        ${_('Select this option to allow repository forking for this user')}
 
                    </span>
 
                </div>
 
            </div>
 

	
 
            </div>
 

	
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save',_('Save'),class_="btn btn-default")}
 
                    ${h.reset('reset',_('Reset'),class_="btn btn-default")}
 
                </div>
 
            </div>
 
    </div>
 
${h.end_form()}
 

	
 
## JS
 
<script>
 
$(document).ready(function(e){
 
    var show_custom_perms = function(inherit_default){
 
        if(inherit_default){
 
            $('#inherit_overlay').hide();
 
        }else{
 
            $('#inherit_overlay').show();
 
        }
 
    };
 

	
 
    show_custom_perms($('#inherit_default_permissions').prop('checked'));
 
    $('#inherit_default_permissions').change(function(){
 
        show_custom_perms($('#inherit_default_permissions').prop('checked'));
 
    });
 
});
 
</script>
 

	
 
</%def>
kallithea/tests/functional/test_forks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import urllib
 
import unittest
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.model.db import Repository
 
from kallithea.model.db import Repository, User
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 

	
 
class _BaseTestCase(TestController):
 
    """
 
    Write all tests here
 
    """
 
    REPO = None
 
    REPO_TYPE = None
 
    NEW_REPO = None
 
    REPO_FORK = None
 

	
 
    def setup_method(self, method):
 
        self.username = u'forkuser'
 
        self.password = u'qweqwe'
 
        u1 = fixture.create_user(self.username, password=self.password, email=u'fork_king@example.com')
 
        self.u1_id = u1.user_id
 
        Session().commit()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_user(self.u1_id)
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        repo_name = self.REPO
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain("""There are no forks yet""")
 

	
 
    def test_no_permissions_to_fork(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN,
 
                            TEST_USER_REGULAR_PASS)['user_id']
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)['user_id']
 
        try:
 
            user_model = UserModel()
 
            usr = User.get_default_user()
 
            user_model.revoke_perm(usr, 'hg.fork.repository')
 
            user_model.grant_perm(usr, 'hg.fork.none')
 
            u = UserModel().get(usr)
 
            u.inherit_default_permissions = False
 
            Session().commit()
 
            # try create a fork
 
            repo_name = self.REPO
 
            self.app.post(url(controller='forks', action='fork_create',
 
                              repo_name=repo_name), {'_authentication_token': self.authentication_token()}, status=403)
 
        finally:
 
            usr = User.get_default_user()
 
            user_model.revoke_perm(usr, 'hg.fork.none')
 
            user_model.grant_perm(usr, 'hg.fork.repository')
 
            Session().commit()
 

	
 
    def test_index_with_fork(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = self.REPO_FORK
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': description,
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 

	
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        # remove this fork
 
        response = self.app.post(url('delete_repo', repo_name=fork_name),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
    def test_fork_create_into_group(self):
 
        self.log_user()
 
        group = fixture.create_repo_group(u'vc')
 
        group_id = group.group_id
 
        fork_name = self.REPO_FORK
 
        fork_name_full = 'vc/%s' % fork_name
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': group_id,
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -265,375 +265,357 @@ class TestPermissions(TestController):
 
    def test_repo_group_user_as_user_group_member(self):
 
        # create Group1
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.read'}
 

	
 
        # set default permission to none
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
 
                                               user=self.anon,
 
                                               perm='group.none')
 
        # make group
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        # add user to group
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        Session().commit()
 

	
 
        # check if user is in the group
 
        members = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
 
        assert members == [self.u1.user_id]
 
        # add some user to that group
 

	
 
        # check his permissions
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.none'}
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.none'}
 

	
 
        # grant ug1 read permissions for
 
        RepoGroupModel().grant_user_group_permission(repo_group=self.g1,
 
                                                      group_name=self.ug1,
 
                                                      perm='group.read')
 
        Session().commit()
 
        # check if the
 
        obj = Session().query(UserGroupRepoGroupToPerm) \
 
            .filter(UserGroupRepoGroupToPerm.group == self.g1) \
 
            .filter(UserGroupRepoGroupToPerm.users_group == self.ug1) \
 
            .scalar()
 
        assert obj.permission.permission_name == 'group.read'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        assert a1_auth.permissions['repositories_groups'] == {u'group1': u'group.none'}
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.read'}
 

	
 
    def test_inherited_permissions_from_default_on_user_enabled(self):
 
    def test_inherit_nice_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 
        # make sure inherit flag is turned on
 
        self.u1.inherit_default_permissions = True
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inherited_permissions_from_default_on_user_disabled(self):
 
    def test_inherit_sad_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 
        # make sure inherit flag is turned on
 
        self.u1.inherit_default_permissions = True
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_non_inherited_permissions_from_default_on_user_enabled(self):
 
    def test_inherit_more_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        # disable global perms on specific user
 
        user_model.revoke_perm(self.u1, 'hg.create.repository')
 
        user_model.grant_perm(self.u1, 'hg.create.none')
 
        user_model.revoke_perm(self.u1, 'hg.fork.repository')
 
        user_model.grant_perm(self.u1, 'hg.fork.none')
 

	
 
        # make sure inherit flag is turned off
 
        self.u1.inherit_default_permissions = False
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have non inherited permissions from he's
 
        # explicitly set permissions
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
        # this user will have inherited more permissions from default user
 
        assert u1_auth.permissions['global'] == set([
 
                              'hg.create.repository',
 
                              'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_non_inherited_permissions_from_default_on_user_disabled(self):
 
    def test_inherit_less_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        # enable global perms on specific user
 
        user_model.revoke_perm(self.u1, 'hg.create.none')
 
        user_model.grant_perm(self.u1, 'hg.create.repository')
 
        user_model.revoke_perm(self.u1, 'hg.fork.none')
 
        user_model.grant_perm(self.u1, 'hg.fork.repository')
 

	
 
        # make sure inherit flag is turned off
 
        self.u1.inherit_default_permissions = False
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have non inherited permissions from he's
 
        # explicitly set permissions
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
        # this user will have inherited less permissions from default user
 
        assert u1_auth.permissions['global'] == set([
 
                              'hg.create.repository',
 
                              'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions(self):
 
        # Issue #138: Inactive User Groups affecting permissions
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and disable inherit-from-default. User permissions should still
 
        # inherit from default.
 
        # group and and verify it really is inactive.
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # enable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.repository')
 

	
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions_inverse(self):
 
        # Issue #138: Inactive User Groups affecting permissions
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and disable inherit-from-default. User permissions should still
 
        # inherit from default.
 
        # group and and verify it really is inactive.
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # disable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.none')
 

	
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable admin access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.admin')
 
        # enable only write access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.write'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable only write access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.write')
 
        # enable admin access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable admin access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.admin')
 
        # enable only write access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.write'}
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable only write access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.write')
 
        # enable admin access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.admin'}
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable admin access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.admin')
 
        # enable only write access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups'][u'G1'] == u'usergroup.read'
 
        assert u1_auth.permissions['user_groups'][u'G2'] == u'usergroup.write'
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable only write access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.write')
 
        # enable admin access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups'][u'G1'] == u'usergroup.read'
 
        assert u1_auth.permissions['user_groups'][u'G2'] == u'usergroup.admin'
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        # set his permission as user group, he should still be admin
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                 group_name=self.ug1,
 
                                                 perm='repository.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_others(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
0 comments (0 inline, 0 general)