Changeset - 4136526cce20
[Not reviewed]
default
0 18 0
Søren Løvborg - 9 years ago 2016-09-13 19:19:59
sorenl@unity3d.com
db: remove superfluous Session.add calls

Don't re-add objects to the SQLAlchemy Session just because they were
modified. Session.add is only for freshly constructed objects that
SQLAlchemy doesn't know about yet.

The rules are quite simple:

When creating a database object by calling the constructor directly, it
must explicitly be added to the session.

When creating an object using a factory function (like "create_repo"),
the returned object has already (by convention) been added to the
session, and should not be added again.

When getting an object from the session (via Session.query or any of the
utility functions that look up objects in the database), it's already
added, and should not be added again. SQLAlchemy notices attribute
modifications automatically for all objects it knows about.
18 files changed with 5 insertions and 35 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/user_groups.py
Show inline comments
 
@@ -357,49 +357,48 @@ class UserGroupsController(BaseControlle
 
                                                  'hg.create.repository'),
 
            'create_user_group_perm': ug_model.has_perm(c.user_group,
 
                                                        'hg.usergroup.create.true'),
 
            'fork_repo_perm': ug_model.has_perm(c.user_group,
 
                                                'hg.fork.repository'),
 
        })
 

	
 
        return htmlfill.render(
 
            render('admin/user_groups/user_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
 
    def update_default_perms(self, id):
 
        user_group = UserGroup.get_or_404(id)
 

	
 
        try:
 
            form = CustomDefaultPermissionsForm()()
 
            form_result = form.to_python(request.POST)
 

	
 
            inherit_perms = form_result['inherit_default_permissions']
 
            user_group.inherit_default_permissions = inherit_perms
 
            Session().add(user_group)
 
            usergroup_model = UserGroupModel()
 

	
 
            defs = UserGroupToPerm.query() \
 
                .filter(UserGroupToPerm.users_group == user_group) \
 
                .all()
 
            for ug in defs:
 
                Session().delete(ug)
 

	
 
            if form_result['create_repo_perm']:
 
                usergroup_model.grant_perm(id, 'hg.create.repository')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.create.none')
 
            if form_result['create_user_group_perm']:
 
                usergroup_model.grant_perm(id, 'hg.usergroup.create.true')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.usergroup.create.false')
 
            if form_result['fork_repo_perm']:
 
                usergroup_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.fork.none')
 

	
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -305,49 +305,48 @@ class UsersController(BaseController):
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def update_perms(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 

	
 
        try:
 
            form = CustomDefaultPermissionsForm()()
 
            form_result = form.to_python(request.POST)
 

	
 
            inherit_perms = form_result['inherit_default_permissions']
 
            user.inherit_default_permissions = inherit_perms
 
            Session().add(user)
 
            user_model = UserModel()
 

	
 
            defs = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
            for ug in defs:
 
                Session().delete(ug)
 

	
 
            if form_result['create_repo_perm']:
 
                user_model.grant_perm(id, 'hg.create.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.create.none')
 
            if form_result['create_user_group_perm']:
 
                user_model.grant_perm(id, 'hg.usergroup.create.true')
 
            else:
 
                user_model.grant_perm(id, 'hg.usergroup.create.false')
 
            if form_result['fork_repo_perm']:
 
                user_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.fork.none')
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
kallithea/lib/db_manage.py
Show inline comments
 
@@ -458,43 +458,42 @@ class DbManage(object):
 
        self.create_default_options()
 

	
 
        log.info('created ui config')
 

	
 
    def create_user(self, username, password, email='', admin=False):
 
        log.info('creating user %s', username)
 
        UserModel().create_or_update(username, password, email,
 
                                     firstname=u'Kallithea', lastname=u'Admin',
 
                                     active=True, admin=admin,
 
                                     extern_type=User.DEFAULT_AUTH_TYPE)
 

	
 
    def create_default_user(self):
 
        log.info('creating default user')
 
        # create default user for handling default permissions.
 
        user = UserModel().create_or_update(username=User.DEFAULT_USER,
 
                                            password=str(uuid.uuid1())[:20],
 
                                            email='anonymous@kallithea-scm.org',
 
                                            firstname=u'Anonymous',
 
                                            lastname=u'User')
 
        # based on configuration options activate/deactivate this user which
 
        # controls anonymous access
 
        if self.cli_args.get('public_access') is False:
 
            log.info('Public access disabled')
 
            user.active = False
 
            Session().add(user)
 
            Session().commit()
 

	
 
    def create_permissions(self):
 
        """
 
        Creates all permissions defined in the system
 
        """
 
        # module.(access|create|change|delete)_[name]
 
        # module.(none|read|write|admin)
 
        log.info('creating permissions')
 
        PermissionModel(self.sa).create_permissions()
 

	
 
    def populate_default_permissions(self):
 
        """
 
        Populate default permissions. It will create only the default
 
        permissions that are missing, and not alter already defined ones
 
        """
 
        log.info('creating default user permissions')
 
        PermissionModel(self.sa).create_default_permissions(user=User.DEFAULT_USER)
kallithea/model/db.py
Show inline comments
 
@@ -646,49 +646,48 @@ class User(Base, BaseModel):
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        Session().add(self)
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
@@ -1267,55 +1266,53 @@ class Repository(Base, BaseModel):
 
            owner=repo.user.username,
 
            fork_of=repo.fork.repo_name if repo.fork else None,
 
            enable_statistics=repo.enable_statistics,
 
            enable_locking=repo.enable_locking,
 
            enable_downloads=repo.enable_downloads,
 
            last_changeset=repo.changeset_cache,
 
            locked_by=User.get(self.locked[0]).get_api_data() \
 
                if self.locked[0] else None,
 
            locked_date=time_to_datetime(self.locked[1]) \
 
                if self.locked[1] else None
 
        )
 
        rc_config = Setting.get_app_settings()
 
        repository_fields = str2bool(rc_config.get('repository_fields'))
 
        if repository_fields:
 
            for f in self.extra_fields:
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @classmethod
 
    def lock(cls, repo, user_id, lock_time=None):
 
        if lock_time is not None:
 
            lock_time = time.time()
 
        repo.locked = [user_id, lock_time]
 
        Session().add(repo)
 
        Session().commit()
 

	
 
    @classmethod
 
    def unlock(cls, repo):
 
        repo.locked = None
 
        Session().add(repo)
 
        Session().commit()
 

	
 
    @classmethod
 
    def getlock(cls, repo):
 
        return repo.locked
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    @property
 
    def clone_uri_hidden(self):
 
        clone_uri = self.clone_uri
 
        if clone_uri:
 
            import urlobject
 
            url_obj = urlobject.URLObject(self.clone_uri)
 
            if url_obj.password:
 
                clone_uri = url_obj.with_password('*****')
 
        return clone_uri
 

	
 
    def clone_url(self, **override):
 
        import kallithea.lib.helpers as h
 
        qualified_home_url = h.canonical_url('home')
 

	
 
@@ -1325,98 +1322,97 @@ class Repository(Base, BaseModel):
 
            del override['with_id']
 

	
 
        if 'uri_tmpl' in override:
 
            uri_tmpl = override['uri_tmpl']
 
            del override['uri_tmpl']
 

	
 
        # we didn't override our tmpl from **overrides
 
        if not uri_tmpl:
 
            uri_tmpl = self.DEFAULT_CLONE_URI
 
            try:
 
                from pylons import tmpl_context as c
 
                uri_tmpl = c.clone_uri_tmpl
 
            except AttributeError:
 
                # in any case if we call this outside of request context,
 
                # ie, not having tmpl_context set up
 
                pass
 

	
 
        return get_clone_url(uri_tmpl=uri_tmpl,
 
                             qualified_home_url=qualified_home_url,
 
                             repo_name=self.repo_name,
 
                             repo_id=self.repo_id, **override)
 

	
 
    def set_state(self, state):
 
        self.repo_state = state
 
        Session().add(self)
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
    #==========================================================================
 

	
 
    def get_changeset(self, rev=None):
 
        return get_changeset_safe(self.scm_instance, rev)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        _rev_type, _rev = self.landing_rev
 
        cs = self.get_changeset(_rev)
 
        if isinstance(cs, EmptyChangeset):
 
            return self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from kallithea.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = EmptyChangeset()
 
            # use no-cache version here
 
            scm_repo = self.scm_instance_no_cache()
 
            if scm_repo:
 
                cs_cache = scm_repo.get_changeset()
 

	
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (not self.changeset_cache or cs_cache['raw_id'] != self.changeset_cache['raw_id']):
 
            _default = datetime.datetime.fromtimestamp(0)
 
            last_change = cs_cache.get('date') or _default
 
            log.debug('updated repo %s with new cs cache %s',
 
                      self.repo_name, cs_cache)
 
            self.updated_on = last_change
 
            self.changeset_cache = cs_cache
 
            Session().add(self)
 
            Session().commit()
 
        else:
 
            log.debug('changeset_cache for %s already up to date with %s',
 
                      self.repo_name, cs_cache['raw_id'])
 

	
 
    @property
 
    def tip(self):
 
        return self.get_changeset('tip')
 

	
 
    @property
 
    def author(self):
 
        return self.tip.author
 

	
 
    @property
 
    def last_change(self):
 
        return self.scm_instance.last_change
 

	
 
    def get_comments(self, revisions=None):
 
        """
 
        Returns comments for this repository grouped by revisions
 

	
 
        :param revisions: filter query by revisions only
 
        """
 
        cmts = ChangesetComment.query() \
 
@@ -2156,52 +2152,52 @@ class CacheInvalidation(Base, BaseModel)
 

	
 
        for inv_obj in inv_objs:
 
            log.debug('marking %s key for invalidation based on repo_name=%s',
 
                      inv_obj, safe_str(repo_name))
 
            Session().delete(inv_obj)
 
        Session().commit()
 

	
 
    @classmethod
 
    def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
 
        """
 
        Mark this cache key as active and currently cached.
 
        Return True if the existing cache registration still was valid.
 
        Return False to indicate that it had been invalidated and caches should be refreshed.
 
        """
 

	
 
        key = (repo_name + '_' + kind) if kind else repo_name
 
        cache_key = cls._get_cache_key(key)
 

	
 
        if valid_cache_keys and cache_key in valid_cache_keys:
 
            return True
 

	
 
        inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
        if inv_obj is None:
 
            inv_obj = cls(cache_key, repo_name)
 
            Session().add(inv_obj)
 
        elif inv_obj.cache_active:
 
            return True
 
        inv_obj.cache_active = True
 
        Session().add(inv_obj)
 
        try:
 
            Session().commit()
 
        except sqlalchemy.exc.IntegrityError:
 
            log.error('commit of CacheInvalidation failed - retrying')
 
            Session().rollback()
 
            inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
            if inv_obj is None:
 
                log.error('failed to create CacheInvalidation entry')
 
                # TODO: fail badly?
 
            # else: TOCTOU - another thread added the key at the same time; no further action required
 
        return False
 

	
 
    @classmethod
 
    def get_valid_cache_keys(cls):
 
        """
 
        Return opaque object with information of which caches still are valid
 
        and can be used without checking for invalidation.
 
        """
 
        return set(inv_obj.cache_key for inv_obj in cls.query().filter(cls.cache_active).all())
 

	
 

	
 
class ChangesetComment(Base, BaseModel):
 
    __tablename__ = 'changeset_comments'
 
    __table_args__ = (
 
@@ -2514,49 +2510,48 @@ class Notification(Base, BaseModel):
 

	
 
    @property
 
    def description(self):
 
        from kallithea.model.notification import NotificationModel
 
        return NotificationModel().make_description(self)
 

	
 

	
 
class UserNotification(Base, BaseModel):
 
    __tablename__ = 'user_to_notification'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'notification_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), primary_key=True)
 
    notification_id = Column(Integer(), ForeignKey('notifications.notification_id'), primary_key=True)
 
    read = Column(Boolean, nullable=False, default=False)
 
    sent_on = Column(DateTime(timezone=False), nullable=True) # FIXME: not nullable?
 

	
 
    user = relationship('User')
 
    notification = relationship('Notification')
 

	
 
    def mark_as_read(self):
 
        self.read = True
 
        Session().add(self)
 

	
 

	
 
class Gist(Base, BaseModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        _table_args_default_dict,
 
    )
 

	
 
    GIST_PUBLIC = u'public'
 
    GIST_PRIVATE = u'private'
 
    DEFAULT_FILENAME = u'gistfile1.txt'
 

	
 
    gist_id = Column(Integer(), primary_key=True)
 
    gist_access_id = Column(Unicode(250), nullable=False)
 
    gist_description = Column(UnicodeText(), nullable=False)
 
    gist_owner = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    gist_expires = Column(Float(53), nullable=False)
 
    gist_type = Column(Unicode(128), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    owner = relationship('User')
kallithea/model/notification.py
Show inline comments
 
@@ -166,69 +166,67 @@ class NotificationModel(BaseModel):
 
        q = UserNotification.query() \
 
            .filter(UserNotification.user == user) \
 
            .join((Notification, UserNotification.notification_id ==
 
                                 Notification.notification_id)) \
 
            .options(joinedload('notification')) \
 
            .options(subqueryload('notification.created_by_user')) \
 
            .order_by(Notification.created_on.desc())
 

	
 
        if filter_:
 
            q = q.filter(Notification.type_.in_(filter_))
 

	
 
        return q
 

	
 
    def mark_read(self, user, notification):
 
        try:
 
            notification = Notification.guess_instance(notification)
 
            user = self._get_user(user)
 
            if notification and user:
 
                obj = UserNotification.query() \
 
                        .filter(UserNotification.user == user) \
 
                        .filter(UserNotification.notification
 
                                == notification) \
 
                        .one()
 
                obj.read = True
 
                Session().add(obj)
 
                return True
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def mark_all_read_for_user(self, user, filter_=None):
 
        user = self._get_user(user)
 
        q = UserNotification.query() \
 
            .filter(UserNotification.user == user) \
 
            .filter(UserNotification.read == False) \
 
            .join((Notification, UserNotification.notification_id ==
 
                                 Notification.notification_id))
 
        if filter_:
 
            q = q.filter(Notification.type_.in_(filter_))
 

	
 
        # this is a little inefficient but sqlalchemy doesn't support
 
        # update on joined tables :(
 
        for obj in q.all():
 
        for obj in q:
 
            obj.read = True
 
            Session().add(obj)
 

	
 
    def get_unread_cnt_for_user(self, user):
 
        user = self._get_user(user)
 
        return UserNotification.query() \
 
                .filter(UserNotification.read == False) \
 
                .filter(UserNotification.user == user).count()
 

	
 
    def get_unread_for_user(self, user):
 
        user = self._get_user(user)
 
        return [x.notification for x in UserNotification.query() \
 
                .filter(UserNotification.read == False) \
 
                .filter(UserNotification.user == user).all()]
 

	
 
    def get_user_notification(self, user, notification):
 
        user = self._get_user(user)
 
        notification = Notification.guess_instance(notification)
 

	
 
        return UserNotification.query() \
 
            .filter(UserNotification.notification == notification) \
 
            .filter(UserNotification.user == user).scalar()
 

	
 
    def make_description(self, notification, show_age=True):
 
        """
 
        Creates a human readable description based on properties
kallithea/model/pull_request.py
Show inline comments
 
@@ -185,25 +185,24 @@ class PullRequestModel(BaseModel):
 
        new_reviewer_users = set(self._get_valid_reviewers(reviewers_ids))
 

	
 
        to_add = new_reviewer_users - current_reviewer_users
 
        to_remove = current_reviewer_users - new_reviewer_users
 

	
 
        if not to_add and not to_remove:
 
            return # all done
 

	
 
        log.debug("Adding %s reviewers", to_add)
 
        self.__add_reviewers(user, pull_request, to_add, set())
 

	
 
        log.debug("Removing %s reviewers", to_remove)
 
        for prr in current_reviewers:
 
            if prr.user in to_remove:
 
                Session().delete(prr)
 

	
 
    def delete(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        Session().delete(pull_request)
 

	
 
    def close_pull_request(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        pull_request.status = PullRequest.STATUS_CLOSED
 
        pull_request.updated_on = datetime.datetime.now()
 
        Session().add(pull_request)
kallithea/model/user.py
Show inline comments
 
@@ -385,49 +385,48 @@ class UserModel(BaseModel):
 

	
 
        if token_age < 0:
 
            log.debug('timestamp is from the future')
 
            return False
 

	
 
        if token_age > UserModel.password_reset_token_lifetime:
 
            log.debug('password reset token expired')
 
            return False
 

	
 
        expected_token = self.get_reset_password_token(user,
 
                                                       timestamp,
 
                                                       h.authentication_token())
 
        log.debug('computed password reset token: %s', expected_token)
 
        log.debug('received password reset token: %s', token)
 
        return expected_token == token
 

	
 
    def reset_password(self, user_email, new_passwd):
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.lib import auth
 
        user = User.get_by_email(user_email)
 
        if user is not None:
 
            if not self.can_change_password(user):
 
                raise Exception('trying to change password for external user')
 
            user.password = auth.get_crypt_password(new_passwd)
 
            Session().add(user)
 
            Session().commit()
 
            log.info('change password for %s', user_email)
 
        if new_passwd is None:
 
            raise Exception('unable to set new password')
 

	
 
        tasks.send_email([user_email],
 
                 _('Password reset notification'),
 
                 _('The password to your account %s has been changed using password reset form.') % (user.username,))
 
        log.info('send password reset mail to %s', user_email)
 

	
 
        return True
 

	
 
    def has_perm(self, user, perm):
 
        perm = self._get_perm(perm)
 
        user = self._get_user(user)
 

	
 
        return UserToPerm.query().filter(UserToPerm.user == user) \
 
            .filter(UserToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user, perm):
 
        """
 
        Grant user global permissions
 

	
 
        :param user:
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -258,49 +258,48 @@ class _BaseTestApi(object):
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull(self):
 
        repo_name = u'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        r.clone_uri = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 
        Session.add(r)
 
        Session.commit()
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_pull_error(self):
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=self.REPO, )
 
        response = api_call(self, params)
 

	
 
        expected = 'Unable to pull changes from `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
 

	
kallithea/tests/fixture.py
Show inline comments
 
@@ -37,56 +37,54 @@ def error_function(*args, **kwargs):
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().add(anon)
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().add(anon)
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
kallithea/tests/functional/test_admin_gists.py
Show inline comments
 
@@ -69,49 +69,48 @@ class TestGistsController(TestController
 
                                         'filename': 'foo',
 
                                         'public': 'public',
 
                                         '_authentication_token': self.authentication_token()},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: foo')
 
        response.mustcontain('gist test')
 
        response.mustcontain('<div class="btn btn-mini btn-success disabled">Public Gist</div>')
 

	
 
    def test_create_with_path_with_dirs(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': '/home/foo',
 
                                         'public': 'public',
 
                                         '_authentication_token': self.authentication_token()},
 
                                 status=200)
 
        response.mustcontain('Filename cannot be inside a directory')
 

	
 
    def test_access_expired_gist(self):
 
        self.log_user()
 
        gist = _create_gist('never-see-me')
 
        gist.gist_expires = 0  # 1970
 
        Session().add(gist)
 
        Session().commit()
 

	
 
        response = self.app.get(url('gist', gist_id=gist.gist_access_id), status=404)
 

	
 
    def test_create_private(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'private gist test',
 
                                         'filename': 'private-foo',
 
                                         'private': 'private',
 
                                         '_authentication_token': self.authentication_token()},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: private-foo<')
 
        response.mustcontain('private gist test')
 
        response.mustcontain('<div class="btn btn-mini btn-warning disabled">Private Gist</div>')
 

	
 
    def test_create_with_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo-desc',
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
@@ -455,49 +455,48 @@ class _BaseTestCase(TestController):
 
        assert Repository.get_by_repo_name(self.REPO).private == True
 

	
 
        #now the repo default permission should be None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.none'
 

	
 
        response = self.app.post(url('update_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                user=TEST_USER_ADMIN_LOGIN,
 
                                                _authentication_token=self.authentication_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        assert Repository.get_by_repo_name(self.REPO).private == False
 

	
 
        #we turn off private now the repo default permission should stay None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.none'
 

	
 
        #update this permission back
 
        perm[0].permission = Permission.get_by_key('repository.read')
 
        Session().add(perm[0])
 
        Session().commit()
 

	
 
    def test_set_repo_fork_has_no_self_id(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO))
 
        opt = """<option value="%s">%s</option>""" % (repo.repo_id, self.REPO)
 
        response.mustcontain(no=[opt])
 

	
 
    def test_set_fork_of_other_repo(self):
 
        self.log_user()
 
        other_repo = u'other_%s' % self.REPO_TYPE
 
        fixture.create_repo(other_repo, repo_type=self.REPO_TYPE)
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        response = self.app.post(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo2.repo_id, _authentication_token=self.authentication_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        self.checkSessionFlash(response,
 
            'Marked repository %s as fork of %s' % (repo.repo_name, repo2.repo_name))
 

	
 
        assert repo.fork == repo2
 
        response = response.follow()
kallithea/tests/functional/test_files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import os
 
import posixpath
 
import mimetypes
 
from kallithea.tests.base import *
 
from kallithea.model.db import Repository
 
from kallithea.model.meta import Session
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 
ARCHIVE_SPECS = {
 
    '.tar.bz2': ('application/x-bzip2', 'tbz2', ''),
 
    '.tar.gz': ('application/x-gzip', 'tgz', ''),
 
    '.zip': ('application/zip', 'zip', ''),
 
}
 

	
 
HG_NODE_HISTORY = fixture.load_resource('hg_node_history_response.json')
 
GIT_NODE_HISTORY = fixture.load_resource('git_node_history_response.json')
 

	
 

	
 
def _set_downloads(repo_name, set_to):
 
    repo = Repository.get_by_repo_name(repo_name)
 
    repo.enable_downloads = set_to
 
    Session().add(repo)
 
    Session().commit()
 

	
 

	
 
class TestFilesController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # Test response...
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/docs"><i class="icon-folder-open"></i><span>docs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/vcs"><i class="icon-folder-open"></i><span>vcs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.gitignore"><i class="icon-doc"></i><span>.gitignore</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.hgignore"><i class="icon-doc"></i><span>.hgignore</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.hgtags"><i class="icon-doc"></i><span>.hgtags</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.travis.yml"><i class="icon-doc"></i><span>.travis.yml</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/MANIFEST.in"><i class="icon-doc"></i><span>MANIFEST.in</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/README.rst"><i class="icon-doc"></i><span>README.rst</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/run_test_and_report.sh"><i class="icon-doc"></i><span>run_test_and_report.sh</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/setup.cfg"><i class="icon-doc"></i><span>setup.cfg</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/setup.py"><i class="icon-doc"></i><span>setup.py</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/test_and_report.sh"><i class="icon-doc"></i><span>test_and_report.sh</span></a>' % HG_REPO)
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -475,32 +475,31 @@ class TestLoginController(TestController
 
                             status=code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=new_api_key.api_key),
 
                             status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            #patch the API key and make it expired
 
            new_api_key.expires = 0
 
            Session().add(new_api_key)
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip',
 
                                 api_key=new_api_key.api_key),
 
                             status=302)
kallithea/tests/functional/test_pullrequests.py
Show inline comments
 
@@ -126,49 +126,48 @@ class TestPullrequestsController(TestCon
 
        assert response.status == '302 Found'
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        assert m != None
 
        pull_request_id = m.group(1)
 

	
 
        # edit it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': invalid_user_id,
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 

	
 
class TestPullrequestsGetRepoRefs(TestController):
 

	
 
    def setup_method(self, method):
 
        self.main = fixture.create_repo(u'main', repo_type='hg')
 
        Session.add(self.main)
 
        Session.commit()
 
        self.c = PullrequestsController()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_repo(u'main')
 
        Session.commit()
 
        Session.remove()
 

	
 
    def test_repo_refs_empty_repo(self):
 
        # empty repo with no commits, no branches, no bookmarks, just one tag
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        assert default == 'tag:null:0000000000000000000000000000000000000000'
 

	
 
    def test_repo_refs_one_commit_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        assert default == 'branch:default:%s' % cs0.raw_id
 
        assert ([('branch:default:%s' % cs0.raw_id, 'default (current tip)')],
 
                'Branches') in refs
 

	
 
    def test_repo_refs_one_commit_rev_hint(self):
kallithea/tests/functional/test_summary.py
Show inline comments
 
@@ -91,49 +91,48 @@ class TestSummaryController(TestControll
 
            response.mustcontain("repo_1")
 
        finally:
 
            RepoModel().delete(Repository.get_by_repo_name(u'repo_1'))
 
            Session().commit()
 

	
 
    def test_index_by_id_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">git"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
    def _enable_stats(self, repo):
 
        r = Repository.get_by_repo_name(repo)
 
        r.enable_statistics = True
 
        Session().add(r)
 
        Session().commit()
 

	
 
    def test_index_trending(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(HG_REPO)
 

	
 
        ScmModel().mark_for_invalidation(HG_REPO)
 
        # generate statistics first
 
        response = self.app.get(url(controller='summary', action='statistics',
 
                                    repo_name=HG_REPO))
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=HG_REPO))
 
        response.mustcontain(
 
            '[["py", {"count": 68, "desc": ["Python"]}], '
 
            '["rst", {"count": 16, "desc": ["Rst"]}], '
 
            '["css", {"count": 2, "desc": ["Css"]}], '
 
            '["sh", {"count": 2, "desc": ["Bash"]}], '
 
            '["yml", {"count": 1, "desc": ["Yaml"]}], '
 
            '["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}], '
 
            '["js", {"count": 1, "desc": ["Javascript"]}], '
 
            '["cfg", {"count": 1, "desc": ["Ini"]}], '
 
            '["ini", {"count": 1, "desc": ["Ini"]}], '
 
            '["html", {"count": 1, "desc": ["EvoqueHtml", "Html"]}]];'
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -669,29 +669,28 @@ class TestPermissions(TestController):
 

	
 
    @parametrize('perm,modify_to', [
 
        ('repository.read', 'repository.none'),
 
        ('group.read', 'group.none'),
 
        ('usergroup.read', 'usergroup.none'),
 
        ('hg.create.repository', 'hg.create.none'),
 
        ('hg.fork.repository', 'hg.fork.none'),
 
        ('hg.register.manual_activate', 'hg.register.auto_activate',)
 
    ])
 
    def test_set_default_permissions_after_modification(self, perm, modify_to):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
        old = Permission.get_by_key(perm)
 
        new = Permission.get_by_key(modify_to)
 
        assert old != None
 
        assert new != None
 

	
 
        #now modify permissions
 
        p = UserToPerm.query() \
 
                .filter(UserToPerm.user == self.u1) \
 
                .filter(UserToPerm.permission == old) \
 
                .one()
 
        p.permission = new
 
        Session().add(p)
 
        Session().commit()
 

	
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
kallithea/tests/models/test_settings.py
Show inline comments
 
@@ -11,37 +11,36 @@ def test_passing_list_setting_value_resu
 
    try:
 
        assert Setting.get_by_name(name) is not None
 
        # Quirk: list value is stringified.
 
        assert Setting.get_by_name(name).app_settings_value \
 
               == "['spam', 'eggs']"
 
        assert Setting.get_by_name(name).app_settings_type == 'unicode'
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
 

	
 
def test_list_valued_setting_creation_requires_manual_value_formatting():
 
    assert Setting.get_by_name(name) is None
 
    # Quirk: need manual formatting of list setting value.
 
    setting = Setting.create_or_update(name, 'spam,eggs', type='list')
 
    Session().flush()
 
    try:
 
        assert setting.app_settings_value == ['spam', 'eggs']
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
 

	
 
def test_list_valued_setting_update():
 
    assert Setting.get_by_name(name) is None
 
    setting = Setting.create_or_update(name, 'spam', type='list')
 
    Session().add(setting)
 
    Session().flush()
 
    try:
 
        assert setting.app_settings_value == [u'spam']
 
        # Assign back setting value.
 
        setting.app_settings_value = setting.app_settings_value
 
        # Quirk: value is stringified on write and listified on read.
 
        assert setting.app_settings_value == ["[u'spam']"]
 
        setting.app_settings_value = setting.app_settings_value
 
        assert setting.app_settings_value == ["[u\"[u'spam']\"]"]
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
kallithea/tests/other/manual_test_vcs_operations.py
Show inline comments
 
@@ -139,86 +139,83 @@ def _add_files_and_push(vcs, DEST, **kwa
 

	
 
    # PUSH it back
 
    _REPO = None
 
    if vcs == 'hg':
 
        _REPO = HG_REPO
 
    elif vcs == 'git':
 
        _REPO = GIT_REPO
 

	
 
    kwargs['dest'] = ''
 
    clone_url = _construct_url(_REPO, **kwargs)
 
    if 'clone_url' in kwargs:
 
        clone_url = kwargs['clone_url']
 
    stdout = stderr = None
 
    if vcs == 'hg':
 
        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
    elif vcs == 'git':
 
        stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
 

	
 
    return stdout, stderr
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_by_username(User.DEFAULT_USER)
 
    user.active = enable
 
    Session().add(user)
 
    Session().commit()
 
    print '\tanonymous access is now:', enable
 
    if enable != User.get_by_username(User.DEFAULT_USER).active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
 
    #WTF Git stderr is output ?!
 
    assert 'fatal' not in stderr
 
    assert 'rejected' not in stderr
 
    assert 'Pushing to' in stderr
 
    assert 'master -> master' in stderr
 

	
 

	
 
class TestVCSOperations(TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        #DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
    def setup_method(self, method):
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
    def test_clone_hg_repo_by_admin(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    def test_clone_wrong_credentials_hg(self):
 
        clone_url = _construct_url(HG_REPO, passwd='bad!')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'abort: authorization failed' in stderr
 
@@ -259,74 +256,74 @@ class TestVCSOperations(TestController):
 
        stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = _construct_url(fork_name).split()[0]
 
        stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url)
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
 
        _check_proper_git_push(stdout, stderr)
 

	
 
    def test_push_invalidates_cache_hg(self):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==HG_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(HG_REPO, HG_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().add(key)
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = _construct_url(fork_name).split()[0]
 
        stdout, stderr = _add_files_and_push('hg', DEST, files_no=1, clone_url=clone_url)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==fork_name).all()
 
        assert key == []
 

	
 
    def test_push_invalidates_cache_git(self):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==GIT_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(GIT_REPO, GIT_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().add(key)
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = _construct_url(fork_name).split()[0]
 
        stdout, stderr = _add_files_and_push('git', DEST, files_no=1, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==fork_name).all()
 
        assert key == []
 

	
 
    def test_push_wrong_credentials_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
 
                                             passwd='name')
 
@@ -346,63 +343,61 @@ class TestVCSOperations(TestController):
 
    def test_push_back_to_wrong_url_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                    clone_url='http://%s/tmp' % HOST)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_push_back_to_wrong_url_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                    clone_url='http://%s/tmp' % HOST)
 

	
 
        assert 'not found' in stderr
 

	
 
    def test_clone_and_create_lock_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    def test_clone_and_create_lock_git(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    def test_clone_after_repo_was_locked_hg(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_clone_after_repo_was_locked_git(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
@@ -448,76 +443,74 @@ class TestVCSOperations(TestController):
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
                                             passwd=TEST_USER_REGULAR_PASS)
 
        err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
 
        assert err in stderr
 

	
 
        #TODO: fix this somehow later on Git, Git is stupid and even if we throw
 
        #back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 

	
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
        #msg = "405 Method Not Allowed"
 
        #assert msg in stderr
 

	
 
    def test_push_unlocks_repository_hg(self):
 
        # enable locking
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(fork_name, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
        assert r.locked[0] == uid
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url.split()[0])
 
        assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked == [None, None]
 

	
 
    #TODO: fix me ! somehow during tests hooks don't get called on Git
 
    def test_push_unlocks_repository_git(self):
 
        # enable locking
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(fork_name, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url.split()[0])
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked == [None, None]
 

	
 
    def test_ip_restriction_hg(self):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
0 comments (0 inline, 0 general)