Changeset - 2045d30919e6
[Not reviewed]
default
0 7 0
Mads Kiilerich - 6 years ago 2020-04-11 18:53:10
mads@kiilerich.com
Grafted from: e5a8ca52c8fa
db: clarify that DEFAULT_USER just is DEFAULT_USER_NAME
7 files changed with 22 insertions and 23 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/db_manage.py
Show inline comments
 
@@ -296,124 +296,124 @@ class DbManage(object):
 
                 'All repositories in that path will be added automatically:'
 
            )
 
        else:
 
            path = test_repo_path
 
        path_ok = True
 

	
 
        # check proper dir
 
        if not os.path.isdir(path):
 
            path_ok = False
 
            log.error('Given path %s is not a valid directory', path)
 

	
 
        elif not os.path.isabs(path):
 
            path_ok = False
 
            log.error('Given path %s is not an absolute path', path)
 

	
 
        # check if path is at least readable.
 
        if not os.access(path, os.R_OK):
 
            path_ok = False
 
            log.error('Given path %s is not readable', path)
 

	
 
        # check write access, warn user about non writeable paths
 
        elif not os.access(path, os.W_OK) and path_ok:
 
            log.warning('No write permission to given path %s', path)
 
            if not self._ask_ok('Given path %s is not writeable, do you want to '
 
                          'continue with read only mode ? [y/n]' % (path,)):
 
                log.error('Canceled by user')
 
                sys.exit(-1)
 

	
 
        if retries == 0:
 
            sys.exit('max retries reached')
 
        if not path_ok:
 
            if _path is not None:
 
                sys.exit('Invalid repo path: %s' % _path)
 
            retries -= 1
 
            return self.prompt_repo_root_path(test_repo_path, retries) # recursing!!!
 

	
 
        real_path = os.path.normpath(os.path.realpath(path))
 

	
 
        if real_path != os.path.normpath(path):
 
            log.warning('Using normalized path %s instead of %s', real_path, path)
 

	
 
        return real_path
 

	
 
    def create_settings(self, repo_root_path):
 
        ui_config = [
 
            ('paths', '/', repo_root_path, True),
 
            #('phases', 'publish', 'false', False)
 
            ('hooks', Ui.HOOK_UPDATE, 'hg update >&2', False),
 
            ('hooks', Ui.HOOK_REPO_SIZE, 'python:kallithea.lib.hooks.repo_size', True),
 
            ('extensions', 'largefiles', '', True),
 
            ('largefiles', 'usercache', os.path.join(repo_root_path, '.cache', 'largefiles'), True),
 
            ('extensions', 'hgsubversion', '', False),
 
            ('extensions', 'hggit', '', False),
 
        ]
 
        for ui_section, ui_key, ui_value, ui_active in ui_config:
 
            ui_conf = Ui(
 
                ui_section=ui_section,
 
                ui_key=ui_key,
 
                ui_value=ui_value,
 
                ui_active=ui_active)
 
            self.sa.add(ui_conf)
 

	
 
        settings = [
 
            ('realm', 'Kallithea', 'unicode'),
 
            ('title', '', 'unicode'),
 
            ('ga_code', '', 'unicode'),
 
            ('show_public_icon', True, 'bool'),
 
            ('show_private_icon', True, 'bool'),
 
            ('stylify_metalabels', False, 'bool'),
 
            ('dashboard_items', 100, 'int'), # TODO: call it page_size
 
            ('admin_grid_items', 25, 'int'),
 
            ('show_version', True, 'bool'),
 
            ('use_gravatar', True, 'bool'),
 
            ('gravatar_url', User.DEFAULT_GRAVATAR_URL, 'unicode'),
 
            ('clone_uri_tmpl', Repository.DEFAULT_CLONE_URI, 'unicode'),
 
            ('clone_ssh_tmpl', Repository.DEFAULT_CLONE_SSH, 'unicode'),
 
        ]
 
        for key, val, type_ in settings:
 
            sett = Setting(key, val, type_)
 
            self.sa.add(sett)
 

	
 
        self.create_auth_plugin_options()
 
        self.create_default_options()
 

	
 
        log.info('Populated Ui and Settings defaults')
 

	
 
    def create_user(self, username, password, email='', admin=False):
 
        log.info('creating user %s', username)
 
        UserModel().create_or_update(username, password, email,
 
                                     firstname='Kallithea', lastname='Admin',
 
                                     active=True, admin=admin,
 
                                     extern_type=User.DEFAULT_AUTH_TYPE)
 

	
 
    def create_default_user(self):
 
        log.info('creating default user')
 
        # create default user for handling default permissions.
 
        user = UserModel().create_or_update(username=User.DEFAULT_USER,
 
        user = UserModel().create_or_update(username=User.DEFAULT_USER_NAME,
 
                                            password=str(uuid.uuid1())[:20],
 
                                            email='anonymous@kallithea-scm.org',
 
                                            firstname='Anonymous',
 
                                            lastname='User')
 
        # based on configuration options activate/deactivate this user which
 
        # controls anonymous access
 
        if self.cli_args.get('public_access') is False:
 
            log.info('Public access disabled')
 
            user.active = False
 
            Session().commit()
 

	
 
    def create_permissions(self):
 
        """
 
        Creates all permissions defined in the system
 
        """
 
        # module.(access|create|change|delete)_[name]
 
        # module.(none|read|write|admin)
 
        log.info('creating permissions')
 
        PermissionModel().create_permissions()
 

	
 
    def populate_default_permissions(self):
 
        """
 
        Populate default permissions. It will create only the default
 
        permissions that are missing, and not alter already defined ones
 
        """
 
        log.info('creating default user permissions')
 
        PermissionModel().create_default_permissions(user=User.DEFAULT_USER)
 
        PermissionModel().create_default_permissions(user=User.DEFAULT_USER_NAME)
kallithea/model/db.py
Show inline comments
 
@@ -305,422 +305,422 @@ class Setting(Base, BaseDbModel):
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import pkg_resources
 
        import platform
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': platform.platform(),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': str(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s %s.%s=%r>' % (
 
            self.__class__.__name__,
 
            self.ui_section, self.ui_key, self.ui_value)
 

	
 

	
 
class User(Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER = 'default'
 
    DEFAULT_USER_NAME = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 
    ssh_keys = relationship('UserSshKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER
 
        return self.username == User.DEFAULT_USER_NAME
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=True):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            # fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        if res is None or not res.active or res.is_default_user:
 
            return None
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(sqlalchemy.func.lower(cls.email) == sqlalchemy.func.lower(email))
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(sqlalchemy.func.lower(UserEmailMap.email) == sqlalchemy.func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls):
 
        user = User.get_by_username(User.DEFAULT_USER)
 
        user = User.get_by_username(User.DEFAULT_USER_NAME)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.expires != -1) & (time.time() > self.expires)
 

	
 

	
 
class UserEmailMap(Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
kallithea/model/permission.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.permission
 
~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
permissions model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import traceback
 

	
 
from sqlalchemy.exc import DatabaseError
 

	
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.model.db import Permission, Session, User, UserRepoGroupToPerm, UserRepoToPerm, UserToPerm, UserUserGroupToPerm
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PermissionModel(object):
 
    """
 
    Permissions model for Kallithea
 
    """
 

	
 
    def create_permissions(self):
 
        """
 
        Create permissions for whole system
 
        """
 
        for p in Permission.PERMS:
 
            if not Permission.get_by_key(p[0]):
 
                new_perm = Permission()
 
                new_perm.permission_name = p[0]
 
                Session().add(new_perm)
 

	
 
    def create_default_permissions(self, user, force=False):
 
        """
 
        Create missing default permissions for user. If force is set, the default
 
        permissions for the user are reset, otherwise only missing permissions are
 
        created.
 

	
 
        :param user:
 
        """
 
        user = User.guess_instance(user)
 

	
 
        def _make_perm(perm):
 
            new_perm = UserToPerm()
 
            new_perm.user = user
 
            new_perm.permission = Permission.get_by_key(perm)
 
            return new_perm
 

	
 
        def _get_group(perm_name):
 
            return '.'.join(perm_name.split('.')[:1])
 

	
 
        perms = UserToPerm.query().filter(UserToPerm.user == user).all()
 
        defined_perms_groups = set(_get_group(x.permission.permission_name) for x in perms)
 
        log.debug('GOT ALREADY DEFINED:%s', perms)
 
        DEFAULT_PERMS = Permission.DEFAULT_USER_PERMISSIONS
 

	
 
        if force:
 
            for perm in perms:
 
                Session().delete(perm)
 
            Session().commit()
 
            defined_perms_groups = []
 
        # For every default permission that needs to be created, we check if
 
        # its group is already defined. If it's not, we create default permission.
 
        for perm_name in DEFAULT_PERMS:
 
        for perm_name in Permission.DEFAULT_USER_PERMISSIONS:
 
            gr = _get_group(perm_name)
 
            if gr not in defined_perms_groups:
 
                log.debug('GR:%s not found, creating permission %s',
 
                          gr, perm_name)
 
                new_perm = _make_perm(perm_name)
 
                Session().add(new_perm)
 

	
 
    def update(self, form_result):
 
        perm_user = User.get_by_username(username=form_result['perm_user_name'])
 

	
 
        try:
 
            # stage 1 set anonymous access
 
            if perm_user.is_default_user:
 
                perm_user.active = str2bool(form_result['anonymous'])
 

	
 
            # stage 2 reset defaults and set them from form data
 
            def _make_new(usr, perm_name):
 
                log.debug('Creating new permission:%s', perm_name)
 
                new = UserToPerm()
 
                new.user = usr
 
                new.permission = Permission.get_by_key(perm_name)
 
                return new
 
            # clear current entries, to make this function idempotent
 
            # it will fix even if we define more permissions or permissions
 
            # are somehow missing
 
            u2p = UserToPerm.query() \
 
                .filter(UserToPerm.user == perm_user) \
 
                .all()
 
            for p in u2p:
 
                Session().delete(p)
 
            # create fresh set of permissions
 
            for def_perm_key in ['default_repo_perm',
 
                                 'default_group_perm',
 
                                 'default_user_group_perm',
 
                                 'default_repo_create',
 
                                 'create_on_write', # special case for create repos on write access to group
 
                                 #'default_repo_group_create', # not implemented yet
 
                                 'default_user_group_create',
 
                                 'default_fork',
 
                                 'default_register',
 
                                 'default_extern_activate']:
 
                p = _make_new(perm_user, form_result[def_perm_key])
 
                Session().add(p)
 

	
 
            # stage 3 update all default permissions for repos if checked
 
            if form_result['overwrite_default_repo']:
 
                _def_name = form_result['default_repo_perm'].split('repository.')[-1]
 
                _def = Permission.get_by_key('repository.' + _def_name)
 
                # repos
 
                for r2p in UserRepoToPerm.query() \
 
                               .filter(UserRepoToPerm.user == perm_user) \
 
                               .all():
 

	
 
                    # don't reset PRIVATE repositories
 
                    if not r2p.repository.private:
 
                        r2p.permission = _def
 

	
 
            if form_result['overwrite_default_group']:
 
                _def_name = form_result['default_group_perm'].split('group.')[-1]
 
                # groups
 
                _def = Permission.get_by_key('group.' + _def_name)
 
                for g2p in UserRepoGroupToPerm.query() \
 
                               .filter(UserRepoGroupToPerm.user == perm_user) \
 
                               .all():
 
                    g2p.permission = _def
 

	
 
            if form_result['overwrite_default_user_group']:
 
                _def_name = form_result['default_user_group_perm'].split('usergroup.')[-1]
 
                # groups
 
                _def = Permission.get_by_key('usergroup.' + _def_name)
 
                for g2p in UserUserGroupToPerm.query() \
 
                               .filter(UserUserGroupToPerm.user == perm_user) \
 
                               .all():
 
                    g2p.permission = _def
 

	
 
            Session().commit()
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            Session().rollback()
 
            raise
kallithea/model/validators.py
Show inline comments
 
@@ -470,202 +470,202 @@ def CanWriteGroup(old_data=None):
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                # parent group need to be existing
 
                if gr and forbidden:
 
                    msg = self.message('permission_denied', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 
                ## check if we can write to root location !
 
                elif gr is None and not can_create_repos():
 
                    msg = self.message('permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                # we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 
    elif type_ == 'user_group':
 
        EMPTY_PERM = 'usergroup.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            # CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().items():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(new_perms_group, key=lambda k: int(k)):
 
                perm_dict = new_perms_group[k]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.items():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    member_name = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == User.DEFAULT_USER:
 
                    if member_name == User.DEFAULT_USER_NAME:
 
                        if str2bool(value.get('repo_private')):
 
                            # set none for default when updating to
 
                            # private repo protects against form manipulation
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 
                    perms_update.add((member_name, v, t))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t == 'user':
 
                        _user_db = User.query() \
 
                            .filter(User.active == True) \
 
                            .filter(User.username == k).one()
 
                    if t == 'users_group':
 
                        _user_db = UserGroup.query() \
 
                            .filter(UserGroup.users_group_active == True) \
 
                            .filter(UserGroup.users_group_name == k).one()
 

	
 
                except Exception as e:
 
                    log.warning('Error validating %s permission %s', t, k)
 
                    msg = self.message('perm_new_member_type', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(perm_new_member_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidSettings():
 
    class _validator(formencode.validators.FancyValidator):
 
        def _convert_to_python(self, value, state):
 
            # settings  form for users that are not admin
 
            # can't edit certain parameters, it's extra backup if they mangle
 
            # with forms
 

	
 
            forbidden_params = [
 
                'user', 'repo_type',
 
                'repo_enable_downloads', 'repo_enable_statistics'
 
            ]
 

	
 
            for param in forbidden_params:
 
                if param in value:
 
                    del value[param]
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            pass
 
    return _validator
 

	
 

	
 
def ValidPath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_path': _('This is not a valid path')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if not os.path.isdir(value):
 
                msg = self.message('invalid_path', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(paths_root_path=msg)
 
                )
 
    return _validator
 

	
 

	
 
def UniqSystemEmail(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'email_taken': _('This email address is already in use')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            if (old_data.get('email') or '').lower() != value:
 
                user = User.get_by_email(value)
 
                if user is not None:
 
                    msg = self.message('email_taken', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(email=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidSystemEmail():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'non_existing_email': _('Email address "%(email)s" not found')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            user = User.get_by_email(value)
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -578,208 +578,208 @@ class _BaseTestApi(object):
 
        new_group = 'some_new_group'
 
        make_user_group(new_group)
 
        RepoModel().grant_user_group_permission(repo=self.REPO,
 
                                                group_name=new_group,
 
                                                perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 
        assert "tags" not in response.json['result']
 
        assert 'pull_requests' not in response.json['result']
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {'name': user.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {'name': user_group.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_user_group(new_group)
 

	
 
        id_, params = _build_data(self.apikey, 'get_repo', repoid=self.REPO,
 
                                  with_revision_names=True,
 
                                  with_pullrequests=True)
 
        response = api_call(self, params)
 
        assert "v0.2.0" in response.json['result']['tags']
 
        assert 'pull_requests' in response.json['result']
 

	
 
    @base.parametrize('grant_perm', [
 
        ('repository.admin'),
 
        ('repository.write'),
 
        ('repository.read'),
 
    ])
 
    def test_api_get_repo_by_non_admin(self, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        assert 2 == len(repo.repo_to_perm)
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user_obj = user.user
 
            user_data = {'name': user_obj.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group_obj = user_group.users_group
 
            user_group_data = {'name': user_group_obj.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=User.DEFAULT_USER,
 
                                          user=User.DEFAULT_USER_NAME,
 
                                          perm='repository.none')
 
        try:
 
            RepoModel().grant_user_permission(repo=self.REPO,
 
                                              user=self.TEST_USER_LOGIN,
 
                                              perm='repository.none')
 

	
 
            id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                      repoid=self.REPO)
 
            response = api_call(self, params)
 

	
 
            expected = 'repository `%s` does not exist' % (self.REPO)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().grant_user_permission(repo=self.REPO,
 
                                              user=User.DEFAULT_USER,
 
                                              user=User.DEFAULT_USER_NAME,
 
                                              perm='repository.read')
 

	
 
    def test_api_get_repo_that_doesn_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid='no-such-repo')
 
        response = api_call(self, params)
 

	
 
        ret = 'repository `%s` does not exist' % 'no-such-repo'
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos(self):
 
        id_, params = _build_data(self.apikey, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        expected = jsonify([
 
            repo.get_api_data()
 
            for repo in Repository.query()
 
        ])
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        expected = jsonify([
 
            repo.get_api_data()
 
            for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN)
 
        ])
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @base.parametrize('name,ret_type', [
 
        ('all', 'all'),
 
        ('dirs', 'dirs'),
 
        ('files', 'files'),
 
    ])
 
    def test_api_get_repo_nodes(self, name, ret_type):
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_revisions(self):
 
        rev = 'i-dont-exist'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_path(self):
 
        rev = 'tip'
 
        path = '/idontexits'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(sorted(['files', 'dirs', 'all']))))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @base.parametrize('name,ret_type,grant_perm', [
 
        ('all', 'all', 'repository.write'),
 
        ('dirs', 'dirs', 'repository.admin'),
 
        ('files', 'files', 'repository.read'),
 
    ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
@@ -1235,206 +1235,206 @@ class _BaseTestApi(object):
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin_no_permission(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name, )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (repo_name)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_exception_occurred(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            with mock.patch.object(RepoModel, 'delete', crash):
 
                id_, params = _build_data(self.apikey, 'delete_repo',
 
                                          repoid=repo_name, )
 
                response = api_call(self, params)
 

	
 
                expected = 'failed to delete repository `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    @base.parametrize('fork_name', [
 
        'api-repo-fork',
 
        '%s/api-repo-fork' % TEST_REPO_GROUP,
 
    ])
 
    def test_api_fork_repo_non_admin(self, fork_name):
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_no_permission_to_fork(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=User.DEFAULT_USER,
 
                                          user=User.DEFAULT_USER_NAME,
 
                                          perm='repository.none')
 
        try:
 
            fork_name = 'api-repo-fork'
 
            id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
            )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (self.REPO)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().grant_user_permission(repo=self.REPO,
 
                                              user=User.DEFAULT_USER,
 
                                              user=User.DEFAULT_USER_NAME,
 
                                              perm='repository.read')
 
            fixture.destroy_repo(fork_name)
 

	
 
    @base.parametrize('name,perm', [
 
        ('read', 'repository.read'),
 
        ('write', 'repository.write'),
 
        ('admin', 'repository.admin'),
 
    ])
 
    def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
 
        fork_name = 'api-repo-fork'
 
        # regardless of base repository permission, forking is disallowed
 
        # when repository creation is disabled
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=perm)
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'no permission to create repositories'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = 'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=owner,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_fork_repo_fork_exists(self):
 
        fork_name = 'api-repo-fork'
 
        fixture.create_fork(self.REPO, fork_name)
 

	
 
        try:
 
            fork_name = 'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
                                      owner=base.TEST_USER_ADMIN_LOGIN,
 
            )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_group(self):
 
        id_, params = _build_data(self.apikey, 'get_user_group',
 
                                  usergroupid=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        user_group = UserGroupModel().get_group(TEST_USER_GROUP)
 
        members = []
 
        for user in user_group.members:
 
            user = user.user
 
            members.append(user.get_api_data())
 

	
 
        ret = user_group.get_api_data()
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
@@ -73,196 +73,196 @@ class _BaseTestCase(base.TestController)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    def test_case_insensitivity(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                                 fixture._get_repo_create_params(repo_private=False,
 
                                                                 repo_name=repo_name,
 
                                                                 repo_type=self.REPO_TYPE,
 
                                                                 repo_description=description,
 
                                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        # try to create repo with swapped case
 
        swapped_repo_name = repo_name.swapcase()
 
        response = self.app.post(base.url('repos'),
 
                                 fixture._get_repo_create_params(repo_private=False,
 
                                                                 repo_name=swapped_repo_name,
 
                                                                 repo_type=self.REPO_TYPE,
 
                                                                 repo_description=description,
 
                                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response.mustcontain('already exists')
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    def test_create_in_group(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = 'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = 'ingroup'
 
        repo_name_full = db.URL_SEP.join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
 
        assert response.json == {'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 1
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_in_group_without_needed_permissions(self):
 
        usr = self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
 
        # avoid spurious RepoGroup DetachedInstanceError ...
 
        session_csrf_secret_token = self.session_csrf_secret_token()
 
        # revoke
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
 
        user_model.revoke_perm(User.DEFAULT_USER_NAME, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER_NAME, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER_NAME, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER_NAME, 'hg.fork.none')
 

	
 
        # disable on regular user
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        Session().commit()
 

	
 
        ## create GROUP
 
        group_name = 'reg_sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE
 
        gr_allowed = RepoGroupModel().create(group_name=group_name_allowed,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_REGULAR_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = 'ingroup'
 
        repo_name_full = db.URL_SEP.join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _session_csrf_secret_token=session_csrf_secret_token))
 

	
 
        response.mustcontain('Invalid value')
 

	
 
        # user is allowed to create in this group
 
        repo_name = 'ingroup'
 
        repo_name_full = db.URL_SEP.join([group_name_allowed, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr_allowed.group_id,
 
                                                _session_csrf_secret_token=session_csrf_secret_token))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
 
        assert response.json == {'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 1
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        RepoGroupModel().delete(group_name_allowed)
 
        Session().commit()
 

	
 
    def test_create_in_group_inherit_permissions(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = 'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        perm = Permission.get_by_key('repository.write')
 
        RepoGroupModel().grant_user_permission(gr, base.TEST_USER_REGULAR_LOGIN, perm)
 

	
 
        ## add repo permissions
 
        Session().commit()
 

	
 
@@ -470,172 +470,172 @@ class _BaseTestCase(base.TestController)
 
                                                owner=base.TEST_USER_ADMIN_LOGIN,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        assert Repository.get_by_repo_name(self.REPO).private == True
 

	
 
        # now the repo default permission should be None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.none'
 

	
 
        response = self.app.post(base.url('update_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                owner=base.TEST_USER_ADMIN_LOGIN,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        assert Repository.get_by_repo_name(self.REPO).private == False
 

	
 
        # we turn off private now the repo default permission should stay None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.none'
 

	
 
        # update this permission back
 
        perm[0].permission = Permission.get_by_key('repository.read')
 
        Session().commit()
 

	
 
    def test_set_repo_fork_has_no_self_id(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        response = self.app.get(base.url('edit_repo_advanced', repo_name=self.REPO))
 
        opt = """<option value="%s">%s</option>""" % (repo.repo_id, self.REPO)
 
        response.mustcontain(no=[opt])
 

	
 
    def test_set_fork_of_other_repo(self):
 
        self.log_user()
 
        other_repo = 'other_%s' % self.REPO_TYPE
 
        fixture.create_repo(other_repo, repo_type=self.REPO_TYPE)
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo2.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        self.checkSessionFlash(response,
 
            'Marked repository %s as fork of %s' % (repo.repo_name, repo2.repo_name))
 

	
 
        assert repo.fork == repo2
 
        response = response.follow()
 
        # check if given repo is selected
 

	
 
        opt = """<option value="%s" selected="selected">%s</option>""" % (
 
                    repo2.repo_id, repo2.repo_name)
 
        response.mustcontain(opt)
 

	
 
        fixture.destroy_repo(other_repo, forks='detach')
 

	
 
    def test_set_fork_of_other_type_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo2.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        self.checkSessionFlash(response,
 
            'Cannot set repository as fork of repository with other type')
 

	
 
    def test_set_fork_of_none(self):
 
        self.log_user()
 
        ## mark it as None
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=None, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        self.checkSessionFlash(response,
 
                               'Marked repository %s as fork of %s'
 
                               % (repo.repo_name, "Nothing"))
 
        assert repo.fork is None
 

	
 
    def test_set_fork_of_same_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        self.checkSessionFlash(response,
 
                               'An error occurred during this operation')
 

	
 
    def test_create_on_top_level_without_permissions(self):
 
        usr = self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
 
        # revoke
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
 
        user_model.revoke_perm(User.DEFAULT_USER_NAME, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER_NAME, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER_NAME, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER_NAME, 'hg.fork.none')
 

	
 
        # disable on regular user
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        Session().commit()
 

	
 

	
 
        user = User.get(usr['user_id'])
 

	
 
        repo_name = self.NEW_REPO + 'no_perms'
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response.mustcontain('<span class="error-message">Invalid value</span>')
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
 
    def test_create_repo_when_filesystem_op_fails(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 

	
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response,
 
                               'Error creating repository %s' % repo_name)
 
        # repo must not be in db
 
        repo = Repository.get_by_repo_name(repo_name)
 
        assert repo is None
 

	
 
        # repo must not be in filesystem !
 
        assert not os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name))
 

	
 

	
 
class TestAdminReposControllerGIT(_BaseTestCase):
 
    REPO = base.GIT_REPO
 
    REPO_TYPE = 'git'
 
    NEW_REPO = base.NEW_GIT_REPO
 
    OTHER_TYPE_REPO = base.HG_REPO
 
    OTHER_TYPE = 'hg'
 

	
 

	
 
class TestAdminReposControllerHG(_BaseTestCase):
 
    REPO = base.HG_REPO
 
    REPO_TYPE = 'hg'
 
    NEW_REPO = base.NEW_HG_REPO
 
    OTHER_TYPE_REPO = base.GIT_REPO
 
    OTHER_TYPE = 'git'
 

	
 
    def test_permanent_url_protocol_access(self):
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        permanent_name = '_%d' % repo.repo_id
 

	
 
        # 400 Bad Request - Unable to detect pull/push action
 
        self.app.get(base.url('summary_home', repo_name=permanent_name),
 
            extra_environ={'HTTP_ACCEPT': 'application/mercurial'},
 
            status=400,
 
        )
kallithea/tests/models/test_permissions.py
Show inline comments
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model import db
 
from kallithea.model.db import Permission, User, UserGroupRepoGroupToPerm, UserToPerm
 
from kallithea.model.meta import Session
 
from kallithea.model.permission import PermissionModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestPermissions(base.TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        # recreate default user to get a clean start
 
        PermissionModel().create_default_permissions(user=User.DEFAULT_USER,
 
        PermissionModel().create_default_permissions(user=User.DEFAULT_USER_NAME,
 
                                                     force=True)
 
        Session().commit()
 

	
 
    def setup_method(self, method):
 
        self.u1 = UserModel().create_or_update(
 
            username='u1', password='qweqwe',
 
            email='u1@example.com', firstname='u1', lastname='u1'
 
        )
 
        self.u2 = UserModel().create_or_update(
 
            username='u2', password='qweqwe',
 
            email='u2@example.com', firstname='u2', lastname='u2'
 
        )
 
        self.u3 = UserModel().create_or_update(
 
            username='u3', password='qweqwe',
 
            email='u3@example.com', firstname='u3', lastname='u3'
 
        )
 
        self.anon = User.get_default_user()
 
        self.a1 = UserModel().create_or_update(
 
            username='a1', password='qweqwe',
 
            email='a1@example.com', firstname='a1', lastname='a1', admin=True
 
        )
 
        Session().commit()
 

	
 
    def teardown_method(self, method):
 
        if hasattr(self, 'test_repo'):
 
            RepoModel().delete(repo=self.test_repo)
 

	
 
        UserModel().delete(self.u1)
 
        UserModel().delete(self.u2)
 
        UserModel().delete(self.u3)
 
        UserModel().delete(self.a1)
 

	
 
        Session().commit() # commit early to avoid SQLAlchemy warning from double cascade delete to users_groups_members
 

	
 
        if hasattr(self, 'g1'):
 
            RepoGroupModel().delete(self.g1.group_id)
 
        if hasattr(self, 'g2'):
 
            RepoGroupModel().delete(self.g2.group_id)
 

	
 
        if hasattr(self, 'ug2'):
 
            UserGroupModel().delete(self.ug2, force=True)
 
        if hasattr(self, 'ug1'):
 
            UserGroupModel().delete(self.ug1, force=True)
 

	
 
        Session().commit()
 

	
 
    def test_default_perms_set(self):
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {base.HG_REPO: 'repository.read'}
 
        }
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == perms['repositories'][base.HG_REPO]
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=base.HG_REPO, user=self.u1,
 
                                          perm=new_perm)
 
        Session().commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == new_perm
 

	
 
    def test_default_admin_perms_set(self):
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.admin', 'hg.create.write_on_repogroup.true']),
 
            'repositories': {base.HG_REPO: 'repository.admin'}
 
        }
 
        assert a1_auth.permissions['repositories'][base.HG_REPO] == perms['repositories'][base.HG_REPO]
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=base.HG_REPO, user=self.a1,
 
                                          perm=new_perm)
 
        Session().commit()
 
        # cannot really downgrade admins permissions !? they still gets set as
 
        # admin !
 
        u1_auth = AuthUser(user_id=self.a1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == perms['repositories'][base.HG_REPO]
 

	
 
    def test_default_group_perms(self):
 
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {'test1': 'group.read', 'test2': 'group.read'},
 
            'global': set(Permission.DEFAULT_USER_PERMISSIONS),
 
            'repositories': {base.HG_REPO: 'repository.read'}
 
        }
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == perms['repositories'][base.HG_REPO]
 
        assert u1_auth.permissions['repositories_groups'] == perms['repositories_groups']
 
        assert u1_auth.permissions['global'] == perms['global']
 

	
 
    def test_default_admin_group_perms(self):
 
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
0 comments (0 inline, 0 general)