Files @ a3b1016855f8
Branch filter:

Location: kallithea/rhodecode/model/users_group.py - annotation

Marcin Kuzminski
fixed tests
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
5f1850e4712a
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
89efedac4e6c
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
b76bb93db070
fa6ba6727475
7e3d89d9d3a2
fa6ba6727475
2ccb32ddcfd7
92a4f7c496a5
2ccb32ddcfd7
7d1fc253549e
fa6ba6727475
2ccb32ddcfd7
fa6ba6727475
9d4b80743a2a
7e3d89d9d3a2
fa6ba6727475
fa6ba6727475
87f0800abc7b
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
a8f520540ab0
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7d1fc253549e
fa6ba6727475
2ccb32ddcfd7
17893d61792a
7e3d89d9d3a2
17893d61792a
7d1fc253549e
fa6ba6727475
2ccb32ddcfd7
7e3d89d9d3a2
8ecc6b8229a5
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
46b17730ca32
46b17730ca32
46b17730ca32
46b17730ca32
7e3d89d9d3a2
10b4e34841a4
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
7e3d89d9d3a2
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
fa6ba6727475
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
10b4e34841a4
8ecc6b8229a5
8ecc6b8229a5
54687aa00724
87f0800abc7b
87f0800abc7b
4c78a0855a17
87f0800abc7b
87f0800abc7b
87f0800abc7b
87f0800abc7b
87f0800abc7b
87f0800abc7b
8ecc6b8229a5
7e3d89d9d3a2
cf51bbfb120e
8ecc6b8229a5
fa6ba6727475
fa6ba6727475
8ecc6b8229a5
260a7a01b054
fa6ba6727475
8ecc6b8229a5
cf51bbfb120e
8ecc6b8229a5
10b4e34841a4
8ecc6b8229a5
8ecc6b8229a5
54687aa00724
92a4f7c496a5
7e3d89d9d3a2
d3ac7491a5c8
0f87c784756e
307ec693bdf2
307ec693bdf2
307ec693bdf2
0f87c784756e
307ec693bdf2
2ccb32ddcfd7
fa6ba6727475
2ccb32ddcfd7
2ccb32ddcfd7
2ccb32ddcfd7
92a4f7c496a5
92a4f7c496a5
8898a79ac628
92a4f7c496a5
2ccb32ddcfd7
10b4e34841a4
92a4f7c496a5
8898a79ac628
8ecc6b8229a5
0f87c784756e
7e3d89d9d3a2
d3ac7491a5c8
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
10b4e34841a4
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
0f87c784756e
8ecc6b8229a5
7e3d89d9d3a2
d3ac7491a5c8
8ecc6b8229a5
fa6ba6727475
fa6ba6727475
fa6ba6727475
8ecc6b8229a5
8ecc6b8229a5
7e3d89d9d3a2
d2d35cf2b351
8ecc6b8229a5
d4b6c8541bd9
fa6ba6727475
fa6ba6727475
fa6ba6727475
d4b6c8541bd9
d4b6c8541bd9
d4b6c8541bd9
d4b6c8541bd9
fa6ba6727475
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
8ecc6b8229a5
7e3d89d9d3a2
d3ac7491a5c8
cf51bbfb120e
fa6ba6727475
fa6ba6727475
fa6ba6727475
1cf94aadabdc
1cf94aadabdc
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
7e3d89d9d3a2
# -*- coding: utf-8 -*-
"""
    rhodecode.model.users_group
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    user group model for RhodeCode

    :created_on: Oct 1, 2011
    :author: nvinot
    :copyright: (C) 2011-2011 Nicolas Vinot <aeris@imirhil.fr>
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
    :license: GPLv3, see COPYING for more details.
"""
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import traceback

from rhodecode.model import BaseModel
from rhodecode.model.db import UserGroupMember, UserGroup,\
    UserGroupRepoToPerm, Permission, UserGroupToPerm, User, UserUserGroupToPerm
from rhodecode.lib.exceptions import UserGroupsAssignedException

log = logging.getLogger(__name__)


class UserGroupModel(BaseModel):

    cls = UserGroup

    def _get_user_group(self, users_group):
        return self._get_instance(UserGroup, users_group,
                                  callback=UserGroup.get_by_group_name)

    def _create_default_perms(self, user_group):
        # create default permission
        default_perm = 'usergroup.read'
        def_user = User.get_default_user()
        for p in def_user.user_perms:
            if p.permission.permission_name.startswith('usergroup.'):
                default_perm = p.permission.permission_name
                break

        user_group_to_perm = UserUserGroupToPerm()
        user_group_to_perm.permission = Permission.get_by_key(default_perm)

        user_group_to_perm.user_group = user_group
        user_group_to_perm.user_id = def_user.user_id
        return user_group_to_perm

    def _update_permissions(self, user_group, perms_new=None,
                            perms_updates=None):
        if not perms_new:
            perms_new = []
        if not perms_updates:
            perms_updates = []

        # update permissions
        for member, perm, member_type in perms_updates:
            if member_type == 'user':
                # this updates existing one
                self.grant_user_permission(
                    user_group=user_group, user=member, perm=perm
                )
            else:
                self.grant_users_group_permission(
                    user_group=user_group, group_name=member, perm=perm
                )
        # set new permissions
        for member, perm, member_type in perms_new:
            if member_type == 'user':
                self.grant_user_permission(
                    user_group=user_group, user=member, perm=perm
                )
            else:
                self.grant_users_group_permission(
                    user_group=user_group, group_name=member, perm=perm
                )

    def get(self, users_group_id, cache=False):
        return UserGroup.get(users_group_id)

    def get_group(self, users_group):
        return self._get_user_group(users_group)

    def get_by_name(self, name, cache=False, case_insensitive=False):
        return UserGroup.get_by_group_name(name, cache, case_insensitive)

    def create(self, name, owner, active=True):
        try:
            new_user_group = UserGroup()
            new_user_group.user = self._get_user(owner)
            new_user_group.users_group_name = name
            new_user_group.users_group_active = active
            self.sa.add(new_user_group)
            perm_obj = self._create_default_perms(new_user_group)
            self.sa.add(perm_obj)

            self.grant_user_permission(user_group=new_user_group,
                                       user=owner, perm='usergroup.admin')

            return new_user_group
        except Exception:
            log.error(traceback.format_exc())
            raise

    def update(self, users_group, form_data):

        try:
            users_group = self._get_user_group(users_group)

            for k, v in form_data.items():
                if k == 'users_group_members':
                    users_group.members = []
                    self.sa.flush()
                    members_list = []
                    if v:
                        v = [v] if isinstance(v, basestring) else v
                        for u_id in set(v):
                            member = UserGroupMember(users_group.users_group_id, u_id)
                            members_list.append(member)
                    setattr(users_group, 'members', members_list)
                setattr(users_group, k, v)

            self.sa.add(users_group)
        except Exception:
            log.error(traceback.format_exc())
            raise

    def delete(self, users_group, force=False):
        """
        Deletes repository group, unless force flag is used
        raises exception if there are members in that group, else deletes
        group and users

        :param users_group:
        :param force:
        """
        try:
            users_group = self._get_user_group(users_group)

            # check if this group is not assigned to repo
            assigned_groups = UserGroupRepoToPerm.query()\
                .filter(UserGroupRepoToPerm.users_group == users_group).all()

            if assigned_groups and not force:
                raise UserGroupsAssignedException('RepoGroup assigned to %s' %
                                                   assigned_groups)

            self.sa.delete(users_group)
        except Exception:
            log.error(traceback.format_exc())
            raise

    def add_user_to_group(self, users_group, user):
        users_group = self._get_user_group(users_group)
        user = self._get_user(user)

        for m in users_group.members:
            u = m.user
            if u.user_id == user.user_id:
                return True

        try:
            users_group_member = UserGroupMember()
            users_group_member.user = user
            users_group_member.users_group = users_group

            users_group.members.append(users_group_member)
            user.group_member.append(users_group_member)

            self.sa.add(users_group_member)
            return users_group_member
        except Exception:
            log.error(traceback.format_exc())
            raise

    def remove_user_from_group(self, users_group, user):
        users_group = self._get_user_group(users_group)
        user = self._get_user(user)

        users_group_member = None
        for m in users_group.members:
            if m.user.user_id == user.user_id:
                # Found this user's membership row
                users_group_member = m
                break

        if users_group_member:
            try:
                self.sa.delete(users_group_member)
                return True
            except Exception:
                log.error(traceback.format_exc())
                raise
        else:
            # User isn't in that group
            return False

    def has_perm(self, users_group, perm):
        users_group = self._get_user_group(users_group)
        perm = self._get_perm(perm)

        return UserGroupToPerm.query()\
            .filter(UserGroupToPerm.users_group == users_group)\
            .filter(UserGroupToPerm.permission == perm).scalar() is not None

    def grant_perm(self, users_group, perm):
        users_group = self._get_user_group(users_group)
        perm = self._get_perm(perm)

        # if this permission is already granted skip it
        _perm = UserGroupToPerm.query()\
            .filter(UserGroupToPerm.users_group == users_group)\
            .filter(UserGroupToPerm.permission == perm)\
            .scalar()
        if _perm:
            return

        new = UserGroupToPerm()
        new.users_group = users_group
        new.permission = perm
        self.sa.add(new)

    def revoke_perm(self, users_group, perm):
        users_group = self._get_user_group(users_group)
        perm = self._get_perm(perm)

        obj = UserGroupToPerm.query()\
            .filter(UserGroupToPerm.users_group == users_group)\
            .filter(UserGroupToPerm.permission == perm).scalar()
        if obj:
            self.sa.delete(obj)

    def grant_user_permission(self, user_group, user, perm):
        """
        Grant permission for user on given user group, or update
        existing one if found

        :param user_group: Instance of UserGroup, users_group_id,
            or users_group_name
        :param user: Instance of User, user_id or username
        :param perm: Instance of Permission, or permission_name
        """

        user_group = self._get_user_group(user_group)
        user = self._get_user(user)
        permission = self._get_perm(perm)

        # check if we have that permission already
        obj = self.sa.query(UserUserGroupToPerm)\
            .filter(UserUserGroupToPerm.user == user)\
            .filter(UserUserGroupToPerm.user_group == user_group)\
            .scalar()
        if obj is None:
            # create new !
            obj = UserUserGroupToPerm()
        obj.user_group = user_group
        obj.user = user
        obj.permission = permission
        self.sa.add(obj)
        log.debug('Granted perm %s to %s on %s' % (perm, user, user_group))

    def revoke_user_permission(self, user_group, user):
        """
        Revoke permission for user on given repository group

        :param user_group: Instance of ReposGroup, repositories_group_id,
            or repositories_group name
        :param user: Instance of User, user_id or username
        """

        user_group = self._get_user_group(user_group)
        user = self._get_user(user)

        obj = self.sa.query(UserUserGroupToPerm)\
            .filter(UserUserGroupToPerm.user == user)\
            .filter(UserUserGroupToPerm.user_group == user_group)\
            .scalar()
        if obj:
            self.sa.delete(obj)
            log.debug('Revoked perm on %s on %s' % (user_group, user))

    def grant_users_group_permission(self, user_group, group_name, perm):
        raise NotImplementedError()

    def revoke_users_group_permission(self, user_group, group_name):
        raise NotImplementedError()