Changeset - 702da441f5c4
[Not reviewed]
default
0 4 0
Marcin Kuzminski - 13 years ago 2013-03-06 00:20:13
marcin@python-works.com
Grafted from: 94f251fda314
fixed issue with renaming repos group together with changing parents with multiple nested trees
added regresion tests for such cases
4 files changed with 160 insertions and 47 deletions:
0 comments (0 inline, 0 general)
rhodecode/model/db.py
Show inline comments
 
@@ -906,786 +906,796 @@ class Repository(Base, BaseModel):
 
        # inject ui extra param to log this action via push logger
 
        for k, v in extras.items():
 
            repo._repo.ui.setconfig('rhodecode_extras', k, v)
 

	
 
    @classmethod
 
    def is_valid(cls, repo_name):
 
        """
 
        returns True if given repo name is a valid filesystem repository
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        from rhodecode.lib.utils import is_valid_repo
 

	
 
        return is_valid_repo(repo_name, cls.base_path())
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating repo api data
 

	
 
        """
 
        repo = self
 
        data = dict(
 
            repo_id=repo.repo_id,
 
            repo_name=repo.repo_name,
 
            repo_type=repo.repo_type,
 
            clone_uri=repo.clone_uri,
 
            private=repo.private,
 
            created_on=repo.created_on,
 
            description=repo.description,
 
            landing_rev=repo.landing_rev,
 
            owner=repo.user.username,
 
            fork_of=repo.fork.repo_name if repo.fork else None,
 
            enable_statistics=repo.enable_statistics,
 
            enable_locking=repo.enable_locking,
 
            enable_downloads=repo.enable_downloads,
 
            last_changeset=repo.changeset_cache
 
        )
 

	
 
        return data
 

	
 
    @classmethod
 
    def lock(cls, repo, user_id):
 
        repo.locked = [user_id, time.time()]
 
        Session().add(repo)
 
        Session().commit()
 

	
 
    @classmethod
 
    def unlock(cls, repo):
 
        repo.locked = None
 
        Session().add(repo)
 
        Session().commit()
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    def clone_url(self, **override):
 
        from pylons import url
 
        from urlparse import urlparse
 
        import urllib
 
        parsed_url = urlparse(url('home', qualified=True))
 
        default_clone_uri = '%(scheme)s://%(user)s%(pass)s%(netloc)s%(prefix)s%(path)s'
 
        decoded_path = safe_unicode(urllib.unquote(parsed_url.path))
 
        args = {
 
           'user': '',
 
           'pass': '',
 
           'scheme': parsed_url.scheme,
 
           'netloc': parsed_url.netloc,
 
           'prefix': decoded_path,
 
           'path': self.repo_name
 
        }
 

	
 
        args.update(override)
 
        return default_clone_uri % args
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
    #==========================================================================
 

	
 
    def get_changeset(self, rev=None):
 
        return get_changeset_safe(self.scm_instance, rev)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        cs = self.get_changeset(self.landing_rev) or self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from rhodecode.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = self.get_changeset()
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (cs_cache != self.changeset_cache
 
            or not self.last_change
 
            or not self.changeset_cache):
 
            _default = datetime.datetime.fromtimestamp(0)
 
            last_change = cs_cache.get('date') or self.last_change or _default
 
            log.debug('updated repo %s with new cs cache %s' % (self, cs_cache))
 
            self.updated_on = last_change
 
            self.changeset_cache = cs_cache
 
            Session().add(self)
 
            Session().commit()
 
        else:
 
            log.debug('Skipping repo:%s already with latest changes' % self)
 

	
 
    @property
 
    def tip(self):
 
        return self.get_changeset('tip')
 

	
 
    @property
 
    def author(self):
 
        return self.tip.author
 

	
 
    @property
 
    def last_change(self):
 
        return self.scm_instance.last_change
 

	
 
    def get_comments(self, revisions=None):
 
        """
 
        Returns comments for this repository grouped by revisions
 

	
 
        :param revisions: filter query by revisions only
 
        """
 
        cmts = ChangesetComment.query()\
 
            .filter(ChangesetComment.repo == self)
 
        if revisions:
 
            cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
 
        grouped = defaultdict(list)
 
        for cmt in cmts.all():
 
            grouped[cmt.revision].append(cmt)
 
        return grouped
 

	
 
    def statuses(self, revisions=None):
 
        """
 
        Returns statuses for this repository
 

	
 
        :param revisions: list of revisions to get statuses for
 
        :type revisions: list
 
        """
 

	
 
        statuses = ChangesetStatus.query()\
 
            .filter(ChangesetStatus.repo == self)\
 
            .filter(ChangesetStatus.version == 0)
 
        if revisions:
 
            statuses = statuses.filter(ChangesetStatus.revision.in_(revisions))
 
        grouped = {}
 

	
 
        #maybe we have open new pullrequest without a status ?
 
        stat = ChangesetStatus.STATUS_UNDER_REVIEW
 
        status_lbl = ChangesetStatus.get_status_lbl(stat)
 
        for pr in PullRequest.query().filter(PullRequest.org_repo == self).all():
 
            for rev in pr.revisions:
 
                pr_id = pr.pull_request_id
 
                pr_repo = pr.other_repo.repo_name
 
                grouped[rev] = [stat, status_lbl, pr_id, pr_repo]
 

	
 
        for stat in statuses.all():
 
            pr_id = pr_repo = None
 
            if stat.pull_request:
 
                pr_id = stat.pull_request.pull_request_id
 
                pr_repo = stat.pull_request.other_repo.repo_name
 
            grouped[stat.revision] = [str(stat.status), stat.status_lbl,
 
                                      pr_id, pr_repo]
 
        return grouped
 

	
 
    #==========================================================================
 
    # SCM CACHE INSTANCE
 
    #==========================================================================
 

	
 
    @property
 
    def invalidate(self):
 
        return CacheInvalidation.invalidate(self.repo_name)
 

	
 
    def set_invalidate(self):
 
        """
 
        set a cache for invalidation for this instance
 
        """
 
        CacheInvalidation.set_invalidate(repo_name=self.repo_name)
 

	
 
    @LazyProperty
 
    def scm_instance_no_cache(self):
 
        return self.__get_instance()
 

	
 
    @LazyProperty
 
    def scm_instance(self):
 
        import rhodecode
 
        full_cache = str2bool(rhodecode.CONFIG.get('vcs_full_cache'))
 
        if full_cache:
 
            return self.scm_instance_cached()
 
        return self.__get_instance()
 

	
 
    def scm_instance_cached(self, cache_map=None):
 
        @cache_region('long_term')
 
        def _c(repo_name):
 
            return self.__get_instance()
 
        rn = self.repo_name
 
        log.debug('Getting cached instance of repo')
 

	
 
        if cache_map:
 
            # get using prefilled cache_map
 
            invalidate_repo = cache_map[self.repo_name]
 
            if invalidate_repo:
 
                invalidate_repo = (None if invalidate_repo.cache_active
 
                                   else invalidate_repo)
 
        else:
 
            # get from invalidate
 
            invalidate_repo = self.invalidate
 

	
 
        if invalidate_repo is not None:
 
            region_invalidate(_c, None, rn)
 
            # update our cache
 
            CacheInvalidation.set_valid(invalidate_repo.cache_key)
 
        return _c(rn)
 

	
 
    def __get_instance(self):
 
        repo_full_path = self.repo_full_path
 
        try:
 
            alias = get_scm(repo_full_path)[0]
 
            log.debug('Creating instance of %s repository' % alias)
 
            backend = get_backend(alias)
 
        except VCSError:
 
            log.error(traceback.format_exc())
 
            log.error('Perhaps this repository is in db and not in '
 
                      'filesystem run rescan repositories with '
 
                      '"destroy old data " option from admin panel')
 
            return
 

	
 
        if alias == 'hg':
 

	
 
            repo = backend(safe_str(repo_full_path), create=False,
 
                           baseui=self._ui)
 
            # skip hidden web repository
 
            if repo._get_hidden():
 
                return
 
        else:
 
            repo = backend(repo_full_path, create=False)
 

	
 
        return repo
 

	
 

	
 
class RepoGroup(Base, BaseModel):
 
    __tablename__ = 'groups'
 
    __table_args__ = (
 
        UniqueConstraint('group_name', 'group_parent_id'),
 
        CheckConstraint('group_id != group_parent_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    __mapper_args__ = {'order_by': 'group_name'}
 

	
 
    group_id = Column("group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    group_name = Column("group_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
 
    group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
 
    group_description = Column("group_description", String(10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    enable_locking = Column("enable_locking", Boolean(), nullable=False, unique=None, default=False)
 

	
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
 
    users_group_to_perm = relationship('UsersGroupRepoGroupToPerm', cascade='all')
 

	
 
    parent_group = relationship('RepoGroup', remote_side=group_id)
 

	
 
    def __init__(self, group_name='', parent_group=None):
 
        self.group_name = group_name
 
        self.parent_group = parent_group
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__, self.group_id,
 
                                  self.group_name)
 

	
 
    @classmethod
 
    def groups_choices(cls, check_perms=False):
 
        from webhelpers.html import literal as _literal
 
        from rhodecode.model.scm import ScmModel
 
        groups = cls.query().all()
 
        if check_perms:
 
            #filter group user have access to, it's done
 
            #magically inside ScmModel based on current user
 
            groups = ScmModel().get_repos_groups(groups)
 
        repo_groups = [('', '')]
 
        sep = ' &raquo; '
 
        _name = lambda k: _literal(sep.join(k))
 

	
 
        repo_groups.extend([(x.group_id, _name(x.full_path_splitted))
 
                              for x in groups])
 

	
 
        repo_groups = sorted(repo_groups, key=lambda t: t[1].split(sep)[0])
 
        return repo_groups
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
 
        if case_insensitive:
 
            gr = cls.query()\
 
                .filter(cls.group_name.ilike(group_name))
 
        else:
 
            gr = cls.query()\
 
                .filter(cls.group_name == group_name)
 
        if cache:
 
            gr = gr.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                            )
 
            )
 
        return gr.scalar()
 

	
 
    @property
 
    def parents(self):
 
        parents_recursion_limit = 5
 
        groups = []
 
        if self.parent_group is None:
 
            return groups
 
        cur_gr = self.parent_group
 
        groups.insert(0, cur_gr)
 
        cnt = 0
 
        while 1:
 
            cnt += 1
 
            gr = getattr(cur_gr, 'parent_group', None)
 
            cur_gr = cur_gr.parent_group
 
            if gr is None:
 
                break
 
            if cnt == parents_recursion_limit:
 
                # this will prevent accidental infinit loops
 
                log.error('group nested more than %s' %
 
                          parents_recursion_limit)
 
                break
 

	
 
            groups.insert(0, gr)
 
        return groups
 

	
 
    @property
 
    def children(self):
 
        return RepoGroup.query().filter(RepoGroup.parent_group == self)
 

	
 
    @property
 
    def name(self):
 
        return self.group_name.split(RepoGroup.url_sep())[-1]
 

	
 
    @property
 
    def full_path(self):
 
        return self.group_name
 

	
 
    @property
 
    def full_path_splitted(self):
 
        return self.group_name.split(RepoGroup.url_sep())
 

	
 
    @property
 
    def repositories(self):
 
        return Repository.query()\
 
                .filter(Repository.group == self)\
 
                .order_by(Repository.repo_name)
 

	
 
    @property
 
    def repositories_recursive_count(self):
 
        cnt = self.repositories.count()
 

	
 
        def children_count(group):
 
            cnt = 0
 
            for child in group.children:
 
                cnt += child.repositories.count()
 
                cnt += children_count(child)
 
            return cnt
 

	
 
        return cnt + children_count(self)
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
    def _recursive_objects(self, include_repos=True):
 
        all_ = []
 

	
 
        def _get_members(root_gr):
 
            for r in root_gr.repositories:
 
                all_.append(r)
 
            if include_repos:
 
                for r in root_gr.repositories:
 
                    all_.append(r)
 
            childs = root_gr.children.all()
 
            if childs:
 
                for gr in childs:
 
                    all_.append(gr)
 
                    _get_members(gr)
 

	
 
        _get_members(self)
 
        return [self] + all_
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
        return self._recursive_objects()
 

	
 
    def recursive_groups(self):
 
        """
 
        Returns all children groups for this group including children of children 
 
        """
 
        return self._recursive_objects(include_repos=False)
 

	
 
    def get_new_name(self, group_name):
 
        """
 
        returns new full group name based on parent and new name
 

	
 
        :param group_name:
 
        """
 
        path_prefix = (self.parent_group.full_path_splitted if
 
                       self.parent_group else [])
 
        return RepoGroup.url_sep().join(path_prefix + [group_name])
 

	
 

	
 
class Permission(Base, BaseModel):
 
    __tablename__ = 'permissions'
 
    __table_args__ = (
 
        Index('p_perm_name_idx', 'permission_name'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    PERMS = [
 
        ('repository.none', _('Repository no access')),
 
        ('repository.read', _('Repository read access')),
 
        ('repository.write', _('Repository write access')),
 
        ('repository.admin', _('Repository admin access')),
 

	
 
        ('group.none', _('Repositories Group no access')),
 
        ('group.read', _('Repositories Group read access')),
 
        ('group.write', _('Repositories Group write access')),
 
        ('group.admin', _('Repositories Group admin access')),
 

	
 
        ('hg.admin', _('RhodeCode Administrator')),
 
        ('hg.create.none', _('Repository creation disabled')),
 
        ('hg.create.repository', _('Repository creation enabled')),
 
        ('hg.fork.none', _('Repository forking disabled')),
 
        ('hg.fork.repository', _('Repository forking enabled')),
 
        ('hg.register.none', _('Register disabled')),
 
        ('hg.register.manual_activate', _('Register new user with RhodeCode '
 
                                          'with manual activation')),
 

	
 
        ('hg.register.auto_activate', _('Register new user with RhodeCode '
 
                                        'with auto activation')),
 
    ]
 

	
 
    # defines which permissions are more important higher the more important
 
    PERM_WEIGHTS = {
 
        'repository.none': 0,
 
        'repository.read': 1,
 
        'repository.write': 3,
 
        'repository.admin': 4,
 

	
 
        'group.none': 0,
 
        'group.read': 1,
 
        'group.write': 3,
 
        'group.admin': 4,
 

	
 
        'hg.fork.none': 0,
 
        'hg.fork.repository': 1,
 
        'hg.create.none': 0,
 
        'hg.create.repository':1
 
    }
 

	
 
    permission_id = Column("permission_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    permission_name = Column("permission_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    permission_longname = Column("permission_longname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (
 
            self.__class__.__name__, self.permission_id, self.permission_name
 
        )
 

	
 
    @classmethod
 
    def get_by_key(cls, key):
 
        return cls.query().filter(cls.permission_name == key).scalar()
 

	
 
    @classmethod
 
    def get_default_perms(cls, default_user_id):
 
        q = Session().query(UserRepoToPerm, Repository, cls)\
 
         .join((Repository, UserRepoToPerm.repository_id == Repository.repo_id))\
 
         .join((cls, UserRepoToPerm.permission_id == cls.permission_id))\
 
         .filter(UserRepoToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 
    @classmethod
 
    def get_default_group_perms(cls, default_user_id):
 
        q = Session().query(UserRepoGroupToPerm, RepoGroup, cls)\
 
         .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
         .join((cls, UserRepoGroupToPerm.permission_id == cls.permission_id))\
 
         .filter(UserRepoGroupToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 

	
 
class UserRepoToPerm(Base, BaseModel):
 
    __tablename__ = 'repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'repository_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    repo_to_perm_id = Column("repo_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 
    repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, repository, permission):
 
        n = cls()
 
        n.user = user
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<user:%s => %s >' % (self.user, self.repository)
 

	
 

	
 
class UserToPerm(Base, BaseModel):
 
    __tablename__ = 'user_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    user_to_perm_id = Column("user_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 

	
 
    user = relationship('User')
 
    permission = relationship('Permission', lazy='joined')
 

	
 

	
 
class UsersGroupRepoToPerm(Base, BaseModel):
 
    __tablename__ = 'users_group_repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'users_group_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 
    repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
 

	
 
    users_group = relationship('UsersGroup')
 
    permission = relationship('Permission')
 
    repository = relationship('Repository')
 

	
 
    @classmethod
 
    def create(cls, users_group, repository, permission):
 
        n = cls()
 
        n.users_group = users_group
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<userGroup:%s => %s >' % (self.users_group, self.repository)
 

	
 

	
 
class UsersGroupToPerm(Base, BaseModel):
 
    __tablename__ = 'users_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('users_group_id', 'permission_id',),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 

	
 
    users_group = relationship('UsersGroup')
 
    permission = relationship('Permission')
 

	
 

	
 
class UserRepoGroupToPerm(Base, BaseModel):
 
    __tablename__ = 'user_repo_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'group_id', 'permission_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 

	
 
    group_to_perm_id = Column("group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 
    group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 

	
 
    user = relationship('User')
 
    group = relationship('RepoGroup')
 
    permission = relationship('Permission')
 

	
 

	
 
class UsersGroupRepoGroupToPerm(Base, BaseModel):
 
    __tablename__ = 'users_group_repo_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('users_group_id', 'group_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 

	
 
    users_group_repo_group_to_perm_id = Column("users_group_repo_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
 
    group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
 

	
 
    users_group = relationship('UsersGroup')
 
    permission = relationship('Permission')
 
    group = relationship('RepoGroup')
 

	
 

	
 
class Statistics(Base, BaseModel):
 
    __tablename__ = 'statistics'
 
    __table_args__ = (
 
         UniqueConstraint('repository_id'),
 
         {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
          'mysql_charset': 'utf8'}
 
    )
 
    stat_id = Column("stat_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True, default=None)
 
    stat_on_revision = Column("stat_on_revision", Integer(), nullable=False)
 
    commit_activity = Column("commit_activity", LargeBinary(1000000), nullable=False)#JSON data
 
    commit_activity_combined = Column("commit_activity_combined", LargeBinary(), nullable=False)#JSON data
 
    languages = Column("languages", LargeBinary(1000000), nullable=False)#JSON data
 

	
 
    repository = relationship('Repository', single_parent=True)
 

	
 

	
 
class UserFollowing(Base, BaseModel):
 
    __tablename__ = 'user_followings'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'follows_repository_id'),
 
        UniqueConstraint('user_id', 'follows_user_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 

	
 
    user_following_id = Column("user_following_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 
    follows_repo_id = Column("follows_repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=None, default=None)
 
    follows_user_id = Column("follows_user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
 
    follows_from = Column('follows_from', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
 

	
 
    user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
 

	
 
    follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
 
    follows_repository = relationship('Repository', order_by='Repository.repo_name')
 

	
 
    @classmethod
 
    def get_repo_followers(cls, repo_id):
 
        return cls.query().filter(cls.follows_repo_id == repo_id)
 

	
 

	
 
class CacheInvalidation(Base, BaseModel):
 
    __tablename__ = 'cache_invalidation'
 
    __table_args__ = (
 
        UniqueConstraint('cache_key'),
 
        Index('key_idx', 'cache_key'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    cache_id = Column("cache_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    cache_key = Column("cache_key", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    cache_args = Column("cache_args", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False)
 

	
 
    def __init__(self, cache_key, cache_args=''):
 
        self.cache_key = cache_key
 
        self.cache_args = cache_args
 
        self.cache_active = False
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__,
 
                                  self.cache_id, self.cache_key)
 

	
 
    @property
 
    def prefix(self):
 
        _split = self.cache_key.split(self.cache_args, 1)
 
        if _split and len(_split) == 2:
 
            return _split[0]
 
        return ''
 

	
 
    @classmethod
 
    def clear_cache(cls):
 
        cls.query().delete()
 

	
 
    @classmethod
 
    def _get_key(cls, key):
 
        """
 
        Wrapper for generating a key, together with a prefix
 

	
 
        :param key:
 
        """
 
        import rhodecode
 
        prefix = ''
 
        org_key = key
 
        iid = rhodecode.CONFIG.get('instance_id')
 
        if iid:
 
            prefix = iid
 

	
 
        return "%s%s" % (prefix, key), prefix, org_key
 

	
 
    @classmethod
 
    def get_by_key(cls, key):
 
        return cls.query().filter(cls.cache_key == key).scalar()
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name):
 
        return cls.query().filter(cls.cache_args == repo_name).all()
 

	
 
    @classmethod
 
    def _get_or_create_key(cls, key, repo_name, commit=True):
 
        inv_obj = Session().query(cls).filter(cls.cache_key == key).scalar()
 
        if not inv_obj:
 
            try:
 
                inv_obj = CacheInvalidation(key, repo_name)
 
                Session().add(inv_obj)
 
                if commit:
 
                    Session().commit()
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                Session().rollback()
 
        return inv_obj
 

	
 
    @classmethod
 
    def invalidate(cls, key):
 
        """
 
        Returns Invalidation object if this given key should be invalidated
 
        None otherwise. `cache_active = False` means that this cache
 
        state is not valid and needs to be invalidated
 

	
 
        :param key:
 
        """
 
        repo_name = key
 
        repo_name = remove_suffix(repo_name, '_README')
 
        repo_name = remove_suffix(repo_name, '_RSS')
 
        repo_name = remove_suffix(repo_name, '_ATOM')
 

	
 
        # adds instance prefix
 
        key, _prefix, _org_key = cls._get_key(key)
 
        inv = cls._get_or_create_key(key, repo_name)
 

	
 
        if inv and inv.cache_active is False:
 
            return inv
 

	
 
    @classmethod
 
    def set_invalidate(cls, key=None, repo_name=None):
 
        """
 
        Mark this Cache key for invalidation, either by key or whole
 
        cache sets based on repo_name
 

	
 
        :param key:
 
        """
 
        if key:
 
            key, _prefix, _org_key = cls._get_key(key)
 
            inv_objs = Session().query(cls).filter(cls.cache_key == key).all()
 
        elif repo_name:
 
            inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
 

	
 
        log.debug('marking %s key[s] for invalidation based on key=%s,repo_name=%s'
 
                  % (len(inv_objs), key, repo_name))
 
        try:
 
            for inv_obj in inv_objs:
 
                inv_obj.cache_active = False
 
                Session().add(inv_obj)
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            Session().rollback()
 

	
 
    @classmethod
 
    def set_valid(cls, key):
 
        """
 
        Mark this cache key as active and currently cached
 

	
 
        :param key:
 
        """
 
        inv_obj = cls.get_by_key(key)
 
        inv_obj.cache_active = True
rhodecode/model/repos_group.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.model.user_group
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    users groups model for RhodeCode
 

	
 
    :created_on: Jan 25, 2011
 
    :author: marcink
 
    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import os
 
import logging
 
import traceback
 
import shutil
 
import datetime
 

	
 
from rhodecode.lib.utils2 import LazyProperty
 

	
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import RepoGroup, RhodeCodeUi, UserRepoGroupToPerm, \
 
    User, Permission, UsersGroupRepoGroupToPerm, UsersGroup, Repository
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposGroupModel(BaseModel):
 

	
 
    cls = RepoGroup
 

	
 
    def __get_users_group(self, users_group):
 
        return self._get_instance(UsersGroup, users_group,
 
                                  callback=UsersGroup.get_by_group_name)
 

	
 
    def _get_repos_group(self, repos_group):
 
        return self._get_instance(RepoGroup, repos_group,
 
                                  callback=RepoGroup.get_by_group_name)
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Get's the repositories root path from database
 
        """
 

	
 
        q = RhodeCodeUi.get_by_key('/')
 
        return q.ui_value
 

	
 
    def _create_default_perms(self, new_group):
 
        # create default permission
 
        repo_group_to_perm = UserRepoGroupToPerm()
 
        default_perm = 'group.read'
 
        for p in User.get_by_username('default').user_perms:
 
            if p.permission.permission_name.startswith('group.'):
 
                default_perm = p.permission.permission_name
 
                break
 

	
 
        repo_group_to_perm.permission_id = self.sa.query(Permission)\
 
                .filter(Permission.permission_name == default_perm)\
 
                .one().permission_id
 

	
 
        repo_group_to_perm.group = new_group
 
        repo_group_to_perm.user_id = User.get_by_username('default').user_id
 

	
 
        self.sa.add(repo_group_to_perm)
 

	
 
    def __create_group(self, group_name):
 
        """
 
        makes repositories group on filesystem
 

	
 
        :param repo_name:
 
        :param parent_id:
 
        """
 

	
 
        create_path = os.path.join(self.repos_path, group_name)
 
        log.debug('creating new group in %s' % create_path)
 

	
 
        if os.path.isdir(create_path):
 
            raise Exception('That directory already exists !')
 

	
 
        os.makedirs(create_path)
 

	
 
    def __rename_group(self, old, new):
 
        """
 
        Renames a group on filesystem
 

	
 
        :param group_name:
 
        """
 

	
 
        if old == new:
 
            log.debug('skipping group rename')
 
            return
 

	
 
        log.debug('renaming repos group from %s to %s' % (old, new))
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 

	
 
        log.debug('renaming repos paths from %s to %s' % (old_path, new_path))
 

	
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def __delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(RepoGroup.url_sep())
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s" % (rm_path))
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                #archive that group`
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
 
                _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, parent=None, just_db=False):
 
        try:
 
            new_repos_group = RepoGroup()
 
            new_repos_group.group_description = group_description
 
            new_repos_group.parent_group = self._get_repos_group(parent)
 
            new_repos_group.group_name = new_repos_group.get_new_name(group_name)
 

	
 
            self.sa.add(new_repos_group)
 
            self._create_default_perms(new_repos_group)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                self.sa.flush()
 
                self.__create_group(new_repos_group.group_name)
 

	
 
            return new_repos_group
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _update_permissions(self, repos_group, perms_new=None,
 
                            perms_updates=None, recursive=False):
 
        from rhodecode.model.repo import RepoModel
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        def _set_perm_user(obj, user, perm):
 
            if isinstance(obj, RepoGroup):
 
                ReposGroupModel().grant_user_permission(
 
                    repos_group=obj, user=user, perm=perm
 
                )
 
            elif isinstance(obj, Repository):
 
                #we do this ONLY IF repository is non-private
 
                if obj.private:
 
                    return
 

	
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_permission(
 
                    repo=obj, user=user, perm=perm
 
                )
 

	
 
        def _set_perm_group(obj, users_group, perm):
 
            if isinstance(obj, RepoGroup):
 
                ReposGroupModel().grant_users_group_permission(
 
                    repos_group=obj, group_name=users_group, perm=perm
 
                )
 
            elif isinstance(obj, Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_users_group_permission(
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s'
 
                  % (repos_group, recursive))
 

	
 
        for obj in repos_group.recursive_groups_and_repos():
 
            #obj is an instance of a group or repositories in that group
 
            if not recursive:
 
                obj = repos_group
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for users group
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            #if it's not recursive call
 
            # break the loop and don't proceed with other changes
 
            if not recursive:
 
                break
 
        return updates
 

	
 
    def update(self, repos_group_id, form_data):
 

	
 
        try:
 
            repos_group = RepoGroup.get(repos_group_id)
 
            recursive = form_data['recursive']
 
            # iterate over all members(if in recursive mode) of this groups and
 
            # set the permissions !
 
            # this can be potentially heavy operation
 
            self._update_permissions(repos_group, form_data['perms_new'],
 
                                     form_data['perms_updates'], recursive)
 

	
 
            old_path = repos_group.full_path
 

	
 
            # change properties
 
            repos_group.group_description = form_data['group_description']
 
            repos_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repos_group.group_parent_id = form_data['group_parent_id']
 
            repos_group.enable_locking = form_data['enable_locking']
 

	
 
            repos_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
 
            new_path = repos_group.full_path
 

	
 
            self.sa.add(repos_group)
 

	
 
            # iterate over all members of this groups and set the locking !
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repos_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repos_group.enable_locking
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
                    # we need to get all repositories from this new group and
 
                    # rename them accordingly to new group path
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
 
                self.sa.add(obj)
 

	
 
            # we need to get all repositories from this new group and
 
            # rename them accordingly to new group path
 
            for r in repos_group.repositories:
 
                r.repo_name = r.get_new_name(r.just_name)
 
                self.sa.add(r)
 

	
 
            self.__rename_group(old_path, new_path)
 

	
 
            return repos_group
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repos_group, force_delete=False):
 
        repos_group = self._get_repos_group(repos_group)
 
        try:
 
            self.sa.delete(repos_group)
 
            self.__delete_group(repos_group, force_delete)
 
        except:
 
            log.error('Error removing repos_group %s' % repos_group)
 
            raise
 

	
 
    def delete_permission(self, repos_group, obj, obj_type, recursive):
 
        """
 
        Revokes permission for repos_group for given obj(user or users_group),
 
        obj_type can be user or users group
 

	
 
        :param repos_group:
 
        :param obj: user or users group id
 
        :param obj_type: user or users group type
 
        :param recursive: recurse to all children of group
 
        """
 
        from rhodecode.model.repo import RepoModel
 
        repos_group = self._get_repos_group(repos_group)
 

	
 
        for el in repos_group.recursive_groups_and_repos():
 
            if not recursive:
 
                # if we don't recurse set the permission on only the top level
 
                # object
 
                el = repos_group
 

	
 
            if isinstance(el, RepoGroup):
 
                if obj_type == 'user':
 
                    ReposGroupModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    ReposGroupModel().revoke_users_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, Repository):
 
                if obj_type == 'user':
 
                    RepoModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    RepoModel().revoke_users_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 

	
 
            #if it's not recursive call
 
            # break the loop and don't proceed with other changes
 
            if not recursive:
 
                break
 

	
 
    def grant_user_permission(self, repos_group, user, perm):
 
        """
 
        Grant permission for user on given repositories group, or update
 
        existing one if found
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 

	
 
        repos_group = self._get_repos_group(repos_group)
 
        user = self._get_user(user)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserRepoGroupToPerm)\
 
            .filter(UserRepoGroupToPerm.user == user)\
 
            .filter(UserRepoGroupToPerm.group == repos_group)\
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoGroupToPerm()
 
        obj.group = repos_group
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, user, repos_group))
 

	
 
    def revoke_user_permission(self, repos_group, user):
 
        """
 
        Revoke permission for user on given repositories group
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        repos_group = self._get_repos_group(repos_group)
 
        user = self._get_user(user)
 

	
 
        obj = self.sa.query(UserRepoGroupToPerm)\
 
            .filter(UserRepoGroupToPerm.user == user)\
 
            .filter(UserRepoGroupToPerm.group == repos_group)\
 
            .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s' % (repos_group, user))
 

	
 
    def grant_users_group_permission(self, repos_group, group_name, perm):
 
        """
 
        Grant permission for users group on given repositories group, or update
 
        existing one if found
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repos_group = self._get_repos_group(repos_group)
 
        group_name = self.__get_users_group(group_name)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == repos_group)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == group_name)\
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UsersGroupRepoGroupToPerm()
 

	
 
        obj.group = repos_group
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, group_name, repos_group))
 

	
 
    def revoke_users_group_permission(self, repos_group, group_name):
 
        """
 
        Revoke permission for users group on given repositories group
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        """
 
        repos_group = self._get_repos_group(repos_group)
 
        group_name = self.__get_users_group(group_name)
 

	
 
        obj = self.sa.query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == repos_group)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == group_name)\
 
            .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm to %s on %s' % (repos_group, group_name))
rhodecode/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 
from rhodecode.tests.nose_parametrized import parameterized
 

	
 
import pylons.test
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params'
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params',
 
    '_get_group_create_params'
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
# nosetests --pdb --pdb-failures
 
# nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session)
 
        if not msg in response.session['flash'][0][1]:
 
            self.fail(
 
                'msg `%s` not found in session flash: got `%s` instead' % (
 
                      msg, response.session['flash'])
 
            )
 

	
 

	
 
## HELPERS ##
 

	
 
def _get_repo_create_params(**custom):
 
    defs = {
 
        'repo_name': None,
 
        'repo_type': 'hg',
 
        'clone_uri': '',
 
        'repo_group': '',
 
        'repo_description': 'DESC',
 
        'repo_private': False,
 
        'repo_landing_rev': 'tip'
 
    }
 
    defs.update(custom)
 
    if 'repo_name_full' not in custom:
 
        defs.update({'repo_name_full': defs['repo_name']})
 

	
 
    return defs
 

	
 

	
 
def _get_group_create_params(**custom):
 
    defs = dict(
 
        group_name=None,
 
        group_description='DESC',
 
        group_parent_id=None,
 
        perms_updates=[],
 
        perms_new=[],
 
        enable_locking=False,
 
        recursive=False
 
    )
 
    defs.update(custom)
 

	
 
    return defs
rhodecode/tests/models/test_repos_groups.py
Show inline comments
 
import os
 
import unittest
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User
 
from rhodecode.model.meta import Session
 
from sqlalchemy.exc import IntegrityError
 

	
 

	
 
def _make_group(path, desc='desc', parent_id=None,
 
                 skip_if_exists=False):
 

	
 
    gr = RepoGroup.get_by_group_name(path)
 
    if gr and skip_if_exists:
 
        return gr
 
    if isinstance(parent_id, RepoGroup):
 
        parent_id = parent_id.group_id
 
    gr = ReposGroupModel().create(path, desc, parent_id)
 
    return gr
 

	
 

	
 
def _update_group(id_, group_name, desc='desc', parent_id=None):
 
    form_data = _get_group_create_params(group_name=group_name,
 
                                         group_desc=desc,
 
                                         group_parent_id=parent_id)
 
    gr = ReposGroupModel().update(id_, form_data)
 
    return gr
 

	
 

	
 
def _make_repo(name, **kwargs):
 
    form_data = _get_repo_create_params(repo_name=name, **kwargs)
 
    cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
    r = RepoModel().create(form_data, cur_user)
 
    return r
 

	
 

	
 
def _update_repo(name, **kwargs):
 
    form_data = _get_repo_create_params(**kwargs)
 
    if not 'repo_name' in kwargs:
 
        form_data['repo_name'] = name
 
    if not 'perms_new' in kwargs:
 
        form_data['perms_new'] = []
 
    if not 'perms_updates' in kwargs:
 
        form_data['perms_updates'] = []
 
    r = RepoModel().update(name, **form_data)
 
    return r
 

	
 

	
 
class TestReposGroups(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.g1 = _make_group('test1', skip_if_exists=True)
 
        Session().commit()
 
        self.g2 = _make_group('test2', skip_if_exists=True)
 
        Session().commit()
 
        self.g3 = _make_group('test3', skip_if_exists=True)
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        print 'out'
 
        Session.remove()
 

	
 
    def __check_path(self, *path):
 
        """
 
        Checks the path for existance !
 
        """
 
        path = [TESTS_TMP_PATH] + list(path)
 
        path = os.path.join(*path)
 
        return os.path.isdir(path)
 

	
 
    def _check_folders(self):
 
        print os.listdir(TESTS_TMP_PATH)
 

	
 
    def __delete_group(self, id_):
 
        ReposGroupModel().delete(id_)
 

	
 
    def __update_group(self, id_, path, desc='desc', parent_id=None):
 
        form_data = dict(
 
            group_name=path,
 
            group_description=desc,
 
            group_parent_id=parent_id,
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        gr = ReposGroupModel().update(id_, form_data)
 
        return gr
 

	
 
    def test_create_group(self):
 
        g = _make_group('newGroup')
 
        Session().commit()
 
        self.assertEqual(g.full_path, 'newGroup')
 

	
 
        self.assertTrue(self.__check_path('newGroup'))
 

	
 
    def test_create_same_name_group(self):
 
        self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
 
        Session().rollback()
 

	
 
    def test_same_subgroup(self):
 
        sg1 = _make_group('sub1', parent_id=self.g1.group_id)
 
        Session().commit()
 
        self.assertEqual(sg1.parent_group, self.g1)
 
        self.assertEqual(sg1.full_path, 'test1/sub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1'))
 

	
 
        ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
 
        Session().commit()
 
        self.assertEqual(ssg1.parent_group, sg1)
 
        self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
 

	
 
    def test_remove_group(self):
 
        sg1 = _make_group('deleteme')
 
        Session().commit()
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('deteteme'))
 

	
 
        sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
 
        Session().commit()
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('test1', 'deteteme'))
 

	
 
    def test_rename_single_group(self):
 
        sg1 = _make_group('initial')
 
        Session().commit()
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after')
 
        new_sg1 = _update_group(sg1.group_id, 'after')
 
        self.assertTrue(self.__check_path('after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
 

	
 
    def test_update_group_parent(self):
 

	
 
        sg1 = _make_group('initial', parent_id=self.g1.group_id)
 
        Session().commit()
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
 
        new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
 
        self.assertTrue(self.__check_path('test1', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        self.assertTrue(self.__check_path('test3', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'hello')
 
        new_sg1 = _update_group(sg1.group_id, 'hello')
 
        self.assertTrue(self.__check_path('hello'))
 

	
 
        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = _make_group('g1')
 
        g2 = _make_group('g2')
 

	
 
        Session().commit()
 
        # create new repo
 
        form_data = _get_repo_create_params(repo_name='john')
 
        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        r = RepoModel().create(form_data, cur_user)
 

	
 
        r = _make_repo('john')
 
        Session().commit()
 
        self.assertEqual(r.repo_name, 'john')
 

	
 
        # put repo into group
 
        form_data = form_data
 
        form_data['repo_group'] = g1.group_id
 
        form_data['perms_new'] = []
 
        form_data['perms_updates'] = []
 
        RepoModel().update(r.repo_name, **form_data)
 
        r = _update_repo('john', repo_group=g1.group_id)
 
        Session().commit()
 
        self.assertEqual(r.repo_name, 'g1/john')
 

	
 
        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        _update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        self.assertTrue(self.__check_path('g2', 'g1'))
 

	
 
        # test repo
 
        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
 
                                                                r.just_name]))
 

	
 
    def test_move_to_root(self):
 
        g1 = _make_group('t11')
 
        Session().commit()
 
        g2 = _make_group('t22', parent_id=g1.group_id)
 
        Session().commit()
 

	
 
        self.assertEqual(g2.full_path, 't11/t22')
 
        self.assertTrue(self.__check_path('t11', 't22'))
 

	
 
        g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
 
        g2 = _update_group(g2.group_id, 'g22', parent_id=None)
 
        Session().commit()
 

	
 
        self.assertEqual(g2.group_name, 'g22')
 
        # we moved out group from t1 to '' so it's full path should be 'g2'
 
        self.assertEqual(g2.full_path, 'g22')
 
        self.assertFalse(self.__check_path('t11', 't22'))
 
        self.assertTrue(self.__check_path('g22'))
 

	
 
    def test_rename_top_level_group_in_nested_setup(self):
 
        g1 = _make_group('L1')
 
        Session().commit()
 
        g2 = _make_group('L2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('L3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        r = _make_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'L1_NEW')
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'L1_NEW')
 
        self.assertEqual(g2.full_path, 'L1_NEW/L2')
 
        self.assertEqual(g3.full_path, 'L1_NEW/L2/L3')
 
        self.assertEqual(r.repo_name,  'L1_NEW/L2/L3/L3_REPO')
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup(self):
 
        g1 = _make_group('R1')
 
        Session().commit()
 
        g2 = _make_group('R2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('R3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        g4 = _make_group('R1_NEW')
 
        Session().commit()
 

	
 
        r = _make_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'R1', parent_id=g4.group_id)
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'R1_NEW/R1')
 
        self.assertEqual(g2.full_path, 'R1_NEW/R1/R2')
 
        self.assertEqual(g3.full_path, 'R1_NEW/R1/R2/R3')
 
        self.assertEqual(r.repo_name,  'R1_NEW/R1/R2/R3/R3_REPO')
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
 
        g1 = _make_group('X1')
 
        Session().commit()
 
        g2 = _make_group('X2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('X3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        g4 = _make_group('X1_NEW')
 
        Session().commit()
 

	
 
        r = _make_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'X1_PRIM', parent_id=g4.group_id)
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'X1_NEW/X1_PRIM')
 
        self.assertEqual(g2.full_path, 'X1_NEW/X1_PRIM/X2')
 
        self.assertEqual(g3.full_path, 'X1_NEW/X1_PRIM/X2/X3')
 
        self.assertEqual(r.repo_name,  'X1_NEW/X1_PRIM/X2/X3/X3_REPO')
0 comments (0 inline, 0 general)