Changeset - 702da441f5c4
[Not reviewed]
default
0 4 0
Marcin Kuzminski - 13 years ago 2013-03-06 00:20:13
marcin@python-works.com
Grafted from: 94f251fda314
fixed issue with renaming repos group together with changing parents with multiple nested trees
added regresion tests for such cases
4 files changed with 160 insertions and 47 deletions:
0 comments (0 inline, 0 general)
rhodecode/model/db.py
Show inline comments
 
@@ -1098,402 +1098,412 @@ class Repository(Base, BaseModel):
 
        set a cache for invalidation for this instance
 
        """
 
        CacheInvalidation.set_invalidate(repo_name=self.repo_name)
 

	
 
    @LazyProperty
 
    def scm_instance_no_cache(self):
 
        return self.__get_instance()
 

	
 
    @LazyProperty
 
    def scm_instance(self):
 
        import rhodecode
 
        full_cache = str2bool(rhodecode.CONFIG.get('vcs_full_cache'))
 
        if full_cache:
 
            return self.scm_instance_cached()
 
        return self.__get_instance()
 

	
 
    def scm_instance_cached(self, cache_map=None):
 
        @cache_region('long_term')
 
        def _c(repo_name):
 
            return self.__get_instance()
 
        rn = self.repo_name
 
        log.debug('Getting cached instance of repo')
 

	
 
        if cache_map:
 
            # get using prefilled cache_map
 
            invalidate_repo = cache_map[self.repo_name]
 
            if invalidate_repo:
 
                invalidate_repo = (None if invalidate_repo.cache_active
 
                                   else invalidate_repo)
 
        else:
 
            # get from invalidate
 
            invalidate_repo = self.invalidate
 

	
 
        if invalidate_repo is not None:
 
            region_invalidate(_c, None, rn)
 
            # update our cache
 
            CacheInvalidation.set_valid(invalidate_repo.cache_key)
 
        return _c(rn)
 

	
 
    def __get_instance(self):
 
        repo_full_path = self.repo_full_path
 
        try:
 
            alias = get_scm(repo_full_path)[0]
 
            log.debug('Creating instance of %s repository' % alias)
 
            backend = get_backend(alias)
 
        except VCSError:
 
            log.error(traceback.format_exc())
 
            log.error('Perhaps this repository is in db and not in '
 
                      'filesystem run rescan repositories with '
 
                      '"destroy old data " option from admin panel')
 
            return
 

	
 
        if alias == 'hg':
 

	
 
            repo = backend(safe_str(repo_full_path), create=False,
 
                           baseui=self._ui)
 
            # skip hidden web repository
 
            if repo._get_hidden():
 
                return
 
        else:
 
            repo = backend(repo_full_path, create=False)
 

	
 
        return repo
 

	
 

	
 
class RepoGroup(Base, BaseModel):
 
    __tablename__ = 'groups'
 
    __table_args__ = (
 
        UniqueConstraint('group_name', 'group_parent_id'),
 
        CheckConstraint('group_id != group_parent_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    __mapper_args__ = {'order_by': 'group_name'}
 

	
 
    group_id = Column("group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    group_name = Column("group_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
 
    group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
 
    group_description = Column("group_description", String(10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    enable_locking = Column("enable_locking", Boolean(), nullable=False, unique=None, default=False)
 

	
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
 
    users_group_to_perm = relationship('UsersGroupRepoGroupToPerm', cascade='all')
 

	
 
    parent_group = relationship('RepoGroup', remote_side=group_id)
 

	
 
    def __init__(self, group_name='', parent_group=None):
 
        self.group_name = group_name
 
        self.parent_group = parent_group
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__, self.group_id,
 
                                  self.group_name)
 

	
 
    @classmethod
 
    def groups_choices(cls, check_perms=False):
 
        from webhelpers.html import literal as _literal
 
        from rhodecode.model.scm import ScmModel
 
        groups = cls.query().all()
 
        if check_perms:
 
            #filter group user have access to, it's done
 
            #magically inside ScmModel based on current user
 
            groups = ScmModel().get_repos_groups(groups)
 
        repo_groups = [('', '')]
 
        sep = ' &raquo; '
 
        _name = lambda k: _literal(sep.join(k))
 

	
 
        repo_groups.extend([(x.group_id, _name(x.full_path_splitted))
 
                              for x in groups])
 

	
 
        repo_groups = sorted(repo_groups, key=lambda t: t[1].split(sep)[0])
 
        return repo_groups
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
 
        if case_insensitive:
 
            gr = cls.query()\
 
                .filter(cls.group_name.ilike(group_name))
 
        else:
 
            gr = cls.query()\
 
                .filter(cls.group_name == group_name)
 
        if cache:
 
            gr = gr.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                            )
 
            )
 
        return gr.scalar()
 

	
 
    @property
 
    def parents(self):
 
        parents_recursion_limit = 5
 
        groups = []
 
        if self.parent_group is None:
 
            return groups
 
        cur_gr = self.parent_group
 
        groups.insert(0, cur_gr)
 
        cnt = 0
 
        while 1:
 
            cnt += 1
 
            gr = getattr(cur_gr, 'parent_group', None)
 
            cur_gr = cur_gr.parent_group
 
            if gr is None:
 
                break
 
            if cnt == parents_recursion_limit:
 
                # this will prevent accidental infinit loops
 
                log.error('group nested more than %s' %
 
                          parents_recursion_limit)
 
                break
 

	
 
            groups.insert(0, gr)
 
        return groups
 

	
 
    @property
 
    def children(self):
 
        return RepoGroup.query().filter(RepoGroup.parent_group == self)
 

	
 
    @property
 
    def name(self):
 
        return self.group_name.split(RepoGroup.url_sep())[-1]
 

	
 
    @property
 
    def full_path(self):
 
        return self.group_name
 

	
 
    @property
 
    def full_path_splitted(self):
 
        return self.group_name.split(RepoGroup.url_sep())
 

	
 
    @property
 
    def repositories(self):
 
        return Repository.query()\
 
                .filter(Repository.group == self)\
 
                .order_by(Repository.repo_name)
 

	
 
    @property
 
    def repositories_recursive_count(self):
 
        cnt = self.repositories.count()
 

	
 
        def children_count(group):
 
            cnt = 0
 
            for child in group.children:
 
                cnt += child.repositories.count()
 
                cnt += children_count(child)
 
            return cnt
 

	
 
        return cnt + children_count(self)
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
    def _recursive_objects(self, include_repos=True):
 
        all_ = []
 

	
 
        def _get_members(root_gr):
 
            for r in root_gr.repositories:
 
                all_.append(r)
 
            if include_repos:
 
                for r in root_gr.repositories:
 
                    all_.append(r)
 
            childs = root_gr.children.all()
 
            if childs:
 
                for gr in childs:
 
                    all_.append(gr)
 
                    _get_members(gr)
 

	
 
        _get_members(self)
 
        return [self] + all_
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
        return self._recursive_objects()
 

	
 
    def recursive_groups(self):
 
        """
 
        Returns all children groups for this group including children of children 
 
        """
 
        return self._recursive_objects(include_repos=False)
 

	
 
    def get_new_name(self, group_name):
 
        """
 
        returns new full group name based on parent and new name
 

	
 
        :param group_name:
 
        """
 
        path_prefix = (self.parent_group.full_path_splitted if
 
                       self.parent_group else [])
 
        return RepoGroup.url_sep().join(path_prefix + [group_name])
 

	
 

	
 
class Permission(Base, BaseModel):
 
    __tablename__ = 'permissions'
 
    __table_args__ = (
 
        Index('p_perm_name_idx', 'permission_name'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    PERMS = [
 
        ('repository.none', _('Repository no access')),
 
        ('repository.read', _('Repository read access')),
 
        ('repository.write', _('Repository write access')),
 
        ('repository.admin', _('Repository admin access')),
 

	
 
        ('group.none', _('Repositories Group no access')),
 
        ('group.read', _('Repositories Group read access')),
 
        ('group.write', _('Repositories Group write access')),
 
        ('group.admin', _('Repositories Group admin access')),
 

	
 
        ('hg.admin', _('RhodeCode Administrator')),
 
        ('hg.create.none', _('Repository creation disabled')),
 
        ('hg.create.repository', _('Repository creation enabled')),
 
        ('hg.fork.none', _('Repository forking disabled')),
 
        ('hg.fork.repository', _('Repository forking enabled')),
 
        ('hg.register.none', _('Register disabled')),
 
        ('hg.register.manual_activate', _('Register new user with RhodeCode '
 
                                          'with manual activation')),
 

	
 
        ('hg.register.auto_activate', _('Register new user with RhodeCode '
 
                                        'with auto activation')),
 
    ]
 

	
 
    # defines which permissions are more important higher the more important
 
    PERM_WEIGHTS = {
 
        'repository.none': 0,
 
        'repository.read': 1,
 
        'repository.write': 3,
 
        'repository.admin': 4,
 

	
 
        'group.none': 0,
 
        'group.read': 1,
 
        'group.write': 3,
 
        'group.admin': 4,
 

	
 
        'hg.fork.none': 0,
 
        'hg.fork.repository': 1,
 
        'hg.create.none': 0,
 
        'hg.create.repository':1
 
    }
 

	
 
    permission_id = Column("permission_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    permission_name = Column("permission_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    permission_longname = Column("permission_longname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (
 
            self.__class__.__name__, self.permission_id, self.permission_name
 
        )
 

	
 
    @classmethod
 
    def get_by_key(cls, key):
 
        return cls.query().filter(cls.permission_name == key).scalar()
 

	
 
    @classmethod
 
    def get_default_perms(cls, default_user_id):
 
        q = Session().query(UserRepoToPerm, Repository, cls)\
 
         .join((Repository, UserRepoToPerm.repository_id == Repository.repo_id))\
 
         .join((cls, UserRepoToPerm.permission_id == cls.permission_id))\
 
         .filter(UserRepoToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 
    @classmethod
 
    def get_default_group_perms(cls, default_user_id):
 
        q = Session().query(UserRepoGroupToPerm, RepoGroup, cls)\
 
         .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
         .join((cls, UserRepoGroupToPerm.permission_id == cls.permission_id))\
 
         .filter(UserRepoGroupToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 

	
 
class UserRepoToPerm(Base, BaseModel):
 
    __tablename__ = 'repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'repository_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    repo_to_perm_id = Column("repo_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 
    repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, repository, permission):
 
        n = cls()
 
        n.user = user
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<user:%s => %s >' % (self.user, self.repository)
 

	
 

	
 
class UserToPerm(Base, BaseModel):
 
    __tablename__ = 'user_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    user_to_perm_id = Column("user_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 

	
 
    user = relationship('User')
 
    permission = relationship('Permission', lazy='joined')
 

	
 

	
 
class UsersGroupRepoToPerm(Base, BaseModel):
 
    __tablename__ = 'users_group_repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'users_group_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 
    repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
 

	
 
    users_group = relationship('UsersGroup')
 
    permission = relationship('Permission')
 
    repository = relationship('Repository')
 

	
 
    @classmethod
 
    def create(cls, users_group, repository, permission):
 
        n = cls()
 
        n.users_group = users_group
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<userGroup:%s => %s >' % (self.users_group, self.repository)
 

	
 

	
 
class UsersGroupToPerm(Base, BaseModel):
 
    __tablename__ = 'users_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('users_group_id', 'permission_id',),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 

	
 
    users_group = relationship('UsersGroup')
 
    permission = relationship('Permission')
 

	
 

	
 
class UserRepoGroupToPerm(Base, BaseModel):
 
    __tablename__ = 'user_repo_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'group_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 

	
 
    group_to_perm_id = Column("group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 
    group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
rhodecode/model/repos_group.py
Show inline comments
 
@@ -55,367 +55,377 @@ class ReposGroupModel(BaseModel):
 
        """
 
        Get's the repositories root path from database
 
        """
 

	
 
        q = RhodeCodeUi.get_by_key('/')
 
        return q.ui_value
 

	
 
    def _create_default_perms(self, new_group):
 
        # create default permission
 
        repo_group_to_perm = UserRepoGroupToPerm()
 
        default_perm = 'group.read'
 
        for p in User.get_by_username('default').user_perms:
 
            if p.permission.permission_name.startswith('group.'):
 
                default_perm = p.permission.permission_name
 
                break
 

	
 
        repo_group_to_perm.permission_id = self.sa.query(Permission)\
 
                .filter(Permission.permission_name == default_perm)\
 
                .one().permission_id
 

	
 
        repo_group_to_perm.group = new_group
 
        repo_group_to_perm.user_id = User.get_by_username('default').user_id
 

	
 
        self.sa.add(repo_group_to_perm)
 

	
 
    def __create_group(self, group_name):
 
        """
 
        makes repositories group on filesystem
 

	
 
        :param repo_name:
 
        :param parent_id:
 
        """
 

	
 
        create_path = os.path.join(self.repos_path, group_name)
 
        log.debug('creating new group in %s' % create_path)
 

	
 
        if os.path.isdir(create_path):
 
            raise Exception('That directory already exists !')
 

	
 
        os.makedirs(create_path)
 

	
 
    def __rename_group(self, old, new):
 
        """
 
        Renames a group on filesystem
 

	
 
        :param group_name:
 
        """
 

	
 
        if old == new:
 
            log.debug('skipping group rename')
 
            return
 

	
 
        log.debug('renaming repos group from %s to %s' % (old, new))
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 

	
 
        log.debug('renaming repos paths from %s to %s' % (old_path, new_path))
 

	
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def __delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(RepoGroup.url_sep())
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s" % (rm_path))
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                #archive that group`
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
 
                _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, parent=None, just_db=False):
 
        try:
 
            new_repos_group = RepoGroup()
 
            new_repos_group.group_description = group_description
 
            new_repos_group.parent_group = self._get_repos_group(parent)
 
            new_repos_group.group_name = new_repos_group.get_new_name(group_name)
 

	
 
            self.sa.add(new_repos_group)
 
            self._create_default_perms(new_repos_group)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                self.sa.flush()
 
                self.__create_group(new_repos_group.group_name)
 

	
 
            return new_repos_group
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _update_permissions(self, repos_group, perms_new=None,
 
                            perms_updates=None, recursive=False):
 
        from rhodecode.model.repo import RepoModel
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        def _set_perm_user(obj, user, perm):
 
            if isinstance(obj, RepoGroup):
 
                ReposGroupModel().grant_user_permission(
 
                    repos_group=obj, user=user, perm=perm
 
                )
 
            elif isinstance(obj, Repository):
 
                #we do this ONLY IF repository is non-private
 
                if obj.private:
 
                    return
 

	
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_permission(
 
                    repo=obj, user=user, perm=perm
 
                )
 

	
 
        def _set_perm_group(obj, users_group, perm):
 
            if isinstance(obj, RepoGroup):
 
                ReposGroupModel().grant_users_group_permission(
 
                    repos_group=obj, group_name=users_group, perm=perm
 
                )
 
            elif isinstance(obj, Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_users_group_permission(
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s'
 
                  % (repos_group, recursive))
 

	
 
        for obj in repos_group.recursive_groups_and_repos():
 
            #obj is an instance of a group or repositories in that group
 
            if not recursive:
 
                obj = repos_group
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for users group
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            #if it's not recursive call
 
            # break the loop and don't proceed with other changes
 
            if not recursive:
 
                break
 
        return updates
 

	
 
    def update(self, repos_group_id, form_data):
 

	
 
        try:
 
            repos_group = RepoGroup.get(repos_group_id)
 
            recursive = form_data['recursive']
 
            # iterate over all members(if in recursive mode) of this groups and
 
            # set the permissions !
 
            # this can be potentially heavy operation
 
            self._update_permissions(repos_group, form_data['perms_new'],
 
                                     form_data['perms_updates'], recursive)
 

	
 
            old_path = repos_group.full_path
 

	
 
            # change properties
 
            repos_group.group_description = form_data['group_description']
 
            repos_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repos_group.group_parent_id = form_data['group_parent_id']
 
            repos_group.enable_locking = form_data['enable_locking']
 

	
 
            repos_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
 
            new_path = repos_group.full_path
 

	
 
            self.sa.add(repos_group)
 

	
 
            # iterate over all members of this groups and set the locking !
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repos_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repos_group.enable_locking
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
                    # we need to get all repositories from this new group and
 
                    # rename them accordingly to new group path
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
 
                self.sa.add(obj)
 

	
 
            # we need to get all repositories from this new group and
 
            # rename them accordingly to new group path
 
            for r in repos_group.repositories:
 
                r.repo_name = r.get_new_name(r.just_name)
 
                self.sa.add(r)
 

	
 
            self.__rename_group(old_path, new_path)
 

	
 
            return repos_group
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repos_group, force_delete=False):
 
        repos_group = self._get_repos_group(repos_group)
 
        try:
 
            self.sa.delete(repos_group)
 
            self.__delete_group(repos_group, force_delete)
 
        except:
 
            log.error('Error removing repos_group %s' % repos_group)
 
            raise
 

	
 
    def delete_permission(self, repos_group, obj, obj_type, recursive):
 
        """
 
        Revokes permission for repos_group for given obj(user or users_group),
 
        obj_type can be user or users group
 

	
 
        :param repos_group:
 
        :param obj: user or users group id
 
        :param obj_type: user or users group type
 
        :param recursive: recurse to all children of group
 
        """
 
        from rhodecode.model.repo import RepoModel
 
        repos_group = self._get_repos_group(repos_group)
 

	
 
        for el in repos_group.recursive_groups_and_repos():
 
            if not recursive:
 
                # if we don't recurse set the permission on only the top level
 
                # object
 
                el = repos_group
 

	
 
            if isinstance(el, RepoGroup):
 
                if obj_type == 'user':
 
                    ReposGroupModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    ReposGroupModel().revoke_users_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, Repository):
 
                if obj_type == 'user':
 
                    RepoModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    RepoModel().revoke_users_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 

	
 
            #if it's not recursive call
 
            # break the loop and don't proceed with other changes
 
            if not recursive:
 
                break
 

	
 
    def grant_user_permission(self, repos_group, user, perm):
 
        """
 
        Grant permission for user on given repositories group, or update
 
        existing one if found
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 

	
 
        repos_group = self._get_repos_group(repos_group)
 
        user = self._get_user(user)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserRepoGroupToPerm)\
 
            .filter(UserRepoGroupToPerm.user == user)\
 
            .filter(UserRepoGroupToPerm.group == repos_group)\
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoGroupToPerm()
 
        obj.group = repos_group
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, user, repos_group))
 

	
 
    def revoke_user_permission(self, repos_group, user):
 
        """
 
        Revoke permission for user on given repositories group
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        repos_group = self._get_repos_group(repos_group)
 
        user = self._get_user(user)
 

	
 
        obj = self.sa.query(UserRepoGroupToPerm)\
 
            .filter(UserRepoGroupToPerm.user == user)\
 
            .filter(UserRepoGroupToPerm.group == repos_group)\
 
            .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s' % (repos_group, user))
 

	
 
    def grant_users_group_permission(self, repos_group, group_name, perm):
 
        """
 
        Grant permission for users group on given repositories group, or update
 
        existing one if found
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repos_group = self._get_repos_group(repos_group)
 
        group_name = self.__get_users_group(group_name)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == repos_group)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == group_name)\
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UsersGroupRepoGroupToPerm()
 

	
 
        obj.group = repos_group
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, group_name, repos_group))
 

	
 
    def revoke_users_group_permission(self, repos_group, group_name):
 
        """
 
        Revoke permission for users group on given repositories group
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        """
 
        repos_group = self._get_repos_group(repos_group)
 
        group_name = self.__get_users_group(group_name)
 

	
 
        obj = self.sa.query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == repos_group)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == group_name)\
 
            .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm to %s on %s' % (repos_group, group_name))
rhodecode/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 
from rhodecode.tests.nose_parametrized import parameterized
 

	
 
import pylons.test
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params'
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params',
 
    '_get_group_create_params'
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
# nosetests --pdb --pdb-failures
 
# nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session)
 
        if not msg in response.session['flash'][0][1]:
 
            self.fail(
 
                'msg `%s` not found in session flash: got `%s` instead' % (
 
                      msg, response.session['flash'])
 
            )
 

	
 

	
 
## HELPERS ##
 

	
 
def _get_repo_create_params(**custom):
 
    defs = {
 
        'repo_name': None,
 
        'repo_type': 'hg',
 
        'clone_uri': '',
 
        'repo_group': '',
 
        'repo_description': 'DESC',
 
        'repo_private': False,
 
        'repo_landing_rev': 'tip'
 
    }
 
    defs.update(custom)
 
    if 'repo_name_full' not in custom:
 
        defs.update({'repo_name_full': defs['repo_name']})
 

	
 
    return defs
 

	
 

	
 
def _get_group_create_params(**custom):
 
    defs = dict(
 
        group_name=None,
 
        group_description='DESC',
 
        group_parent_id=None,
 
        perms_updates=[],
 
        perms_new=[],
 
        enable_locking=False,
 
        recursive=False
 
    )
 
    defs.update(custom)
 

	
 
    return defs
rhodecode/tests/models/test_repos_groups.py
Show inline comments
 
import os
 
import unittest
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User
 
from rhodecode.model.meta import Session
 
from sqlalchemy.exc import IntegrityError
 

	
 

	
 
def _make_group(path, desc='desc', parent_id=None,
 
                 skip_if_exists=False):
 

	
 
    gr = RepoGroup.get_by_group_name(path)
 
    if gr and skip_if_exists:
 
        return gr
 
    if isinstance(parent_id, RepoGroup):
 
        parent_id = parent_id.group_id
 
    gr = ReposGroupModel().create(path, desc, parent_id)
 
    return gr
 

	
 

	
 
def _update_group(id_, group_name, desc='desc', parent_id=None):
 
    form_data = _get_group_create_params(group_name=group_name,
 
                                         group_desc=desc,
 
                                         group_parent_id=parent_id)
 
    gr = ReposGroupModel().update(id_, form_data)
 
    return gr
 

	
 

	
 
def _make_repo(name, **kwargs):
 
    form_data = _get_repo_create_params(repo_name=name, **kwargs)
 
    cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
    r = RepoModel().create(form_data, cur_user)
 
    return r
 

	
 

	
 
def _update_repo(name, **kwargs):
 
    form_data = _get_repo_create_params(**kwargs)
 
    if not 'repo_name' in kwargs:
 
        form_data['repo_name'] = name
 
    if not 'perms_new' in kwargs:
 
        form_data['perms_new'] = []
 
    if not 'perms_updates' in kwargs:
 
        form_data['perms_updates'] = []
 
    r = RepoModel().update(name, **form_data)
 
    return r
 

	
 

	
 
class TestReposGroups(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.g1 = _make_group('test1', skip_if_exists=True)
 
        Session().commit()
 
        self.g2 = _make_group('test2', skip_if_exists=True)
 
        Session().commit()
 
        self.g3 = _make_group('test3', skip_if_exists=True)
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        print 'out'
 
        Session.remove()
 

	
 
    def __check_path(self, *path):
 
        """
 
        Checks the path for existance !
 
        """
 
        path = [TESTS_TMP_PATH] + list(path)
 
        path = os.path.join(*path)
 
        return os.path.isdir(path)
 

	
 
    def _check_folders(self):
 
        print os.listdir(TESTS_TMP_PATH)
 

	
 
    def __delete_group(self, id_):
 
        ReposGroupModel().delete(id_)
 

	
 
    def __update_group(self, id_, path, desc='desc', parent_id=None):
 
        form_data = dict(
 
            group_name=path,
 
            group_description=desc,
 
            group_parent_id=parent_id,
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        gr = ReposGroupModel().update(id_, form_data)
 
        return gr
 

	
 
    def test_create_group(self):
 
        g = _make_group('newGroup')
 
        Session().commit()
 
        self.assertEqual(g.full_path, 'newGroup')
 

	
 
        self.assertTrue(self.__check_path('newGroup'))
 

	
 
    def test_create_same_name_group(self):
 
        self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
 
        Session().rollback()
 

	
 
    def test_same_subgroup(self):
 
        sg1 = _make_group('sub1', parent_id=self.g1.group_id)
 
        Session().commit()
 
        self.assertEqual(sg1.parent_group, self.g1)
 
        self.assertEqual(sg1.full_path, 'test1/sub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1'))
 

	
 
        ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
 
        Session().commit()
 
        self.assertEqual(ssg1.parent_group, sg1)
 
        self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
 

	
 
    def test_remove_group(self):
 
        sg1 = _make_group('deleteme')
 
        Session().commit()
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('deteteme'))
 

	
 
        sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
 
        Session().commit()
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('test1', 'deteteme'))
 

	
 
    def test_rename_single_group(self):
 
        sg1 = _make_group('initial')
 
        Session().commit()
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after')
 
        new_sg1 = _update_group(sg1.group_id, 'after')
 
        self.assertTrue(self.__check_path('after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
 

	
 
    def test_update_group_parent(self):
 

	
 
        sg1 = _make_group('initial', parent_id=self.g1.group_id)
 
        Session().commit()
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
 
        new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
 
        self.assertTrue(self.__check_path('test1', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        self.assertTrue(self.__check_path('test3', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'hello')
 
        new_sg1 = _update_group(sg1.group_id, 'hello')
 
        self.assertTrue(self.__check_path('hello'))
 

	
 
        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = _make_group('g1')
 
        g2 = _make_group('g2')
 

	
 
        Session().commit()
 
        # create new repo
 
        form_data = _get_repo_create_params(repo_name='john')
 
        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        r = RepoModel().create(form_data, cur_user)
 

	
 
        r = _make_repo('john')
 
        Session().commit()
 
        self.assertEqual(r.repo_name, 'john')
 

	
 
        # put repo into group
 
        form_data = form_data
 
        form_data['repo_group'] = g1.group_id
 
        form_data['perms_new'] = []
 
        form_data['perms_updates'] = []
 
        RepoModel().update(r.repo_name, **form_data)
 
        r = _update_repo('john', repo_group=g1.group_id)
 
        Session().commit()
 
        self.assertEqual(r.repo_name, 'g1/john')
 

	
 
        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        _update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        self.assertTrue(self.__check_path('g2', 'g1'))
 

	
 
        # test repo
 
        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
 
                                                                r.just_name]))
 

	
 
    def test_move_to_root(self):
 
        g1 = _make_group('t11')
 
        Session().commit()
 
        g2 = _make_group('t22', parent_id=g1.group_id)
 
        Session().commit()
 

	
 
        self.assertEqual(g2.full_path, 't11/t22')
 
        self.assertTrue(self.__check_path('t11', 't22'))
 

	
 
        g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
 
        g2 = _update_group(g2.group_id, 'g22', parent_id=None)
 
        Session().commit()
 

	
 
        self.assertEqual(g2.group_name, 'g22')
 
        # we moved out group from t1 to '' so it's full path should be 'g2'
 
        self.assertEqual(g2.full_path, 'g22')
 
        self.assertFalse(self.__check_path('t11', 't22'))
 
        self.assertTrue(self.__check_path('g22'))
 

	
 
    def test_rename_top_level_group_in_nested_setup(self):
 
        g1 = _make_group('L1')
 
        Session().commit()
 
        g2 = _make_group('L2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('L3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        r = _make_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'L1_NEW')
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'L1_NEW')
 
        self.assertEqual(g2.full_path, 'L1_NEW/L2')
 
        self.assertEqual(g3.full_path, 'L1_NEW/L2/L3')
 
        self.assertEqual(r.repo_name,  'L1_NEW/L2/L3/L3_REPO')
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup(self):
 
        g1 = _make_group('R1')
 
        Session().commit()
 
        g2 = _make_group('R2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('R3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        g4 = _make_group('R1_NEW')
 
        Session().commit()
 

	
 
        r = _make_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'R1', parent_id=g4.group_id)
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'R1_NEW/R1')
 
        self.assertEqual(g2.full_path, 'R1_NEW/R1/R2')
 
        self.assertEqual(g3.full_path, 'R1_NEW/R1/R2/R3')
 
        self.assertEqual(r.repo_name,  'R1_NEW/R1/R2/R3/R3_REPO')
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
 
        g1 = _make_group('X1')
 
        Session().commit()
 
        g2 = _make_group('X2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('X3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        g4 = _make_group('X1_NEW')
 
        Session().commit()
 

	
 
        r = _make_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'X1_PRIM', parent_id=g4.group_id)
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'X1_NEW/X1_PRIM')
 
        self.assertEqual(g2.full_path, 'X1_NEW/X1_PRIM/X2')
 
        self.assertEqual(g3.full_path, 'X1_NEW/X1_PRIM/X2/X3')
 
        self.assertEqual(r.repo_name,  'X1_NEW/X1_PRIM/X2/X3/X3_REPO')
0 comments (0 inline, 0 general)