Changeset - 79c5967a1e5c
[Not reviewed]
rhodecode/lib/dbmigrate/schema/db_1_4_0.py
Show inline comments
 
@@ -1719,97 +1719,96 @@ class PullRequestReviewers(Base, BaseMod
 

	
 
    pull_requests_reviewers_id = Column('pull_requests_reviewers_id', Integer(), nullable=False, primary_key=True)
 
    pull_request_id = Column("pull_request_id", Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True)
 

	
 
    user = relationship('User')
 
    pull_request = relationship('PullRequest')
 

	
 

	
 
class Notification(Base, BaseModel):
 
    __tablename__ = 'notifications'
 
    __table_args__ = (
 
        Index('notification_type_idx', 'type'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 

	
 
    TYPE_CHANGESET_COMMENT = u'cs_comment'
 
    TYPE_MESSAGE = u'message'
 
    TYPE_MENTION = u'mention'
 
    TYPE_REGISTRATION = u'registration'
 
    TYPE_PULL_REQUEST = u'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
 

	
 
    notification_id = Column('notification_id', Integer(), nullable=False, primary_key=True)
 
    subject = Column('subject', Unicode(512), nullable=True)
 
    body = Column('body', UnicodeText(50000), nullable=True)
 
    created_by = Column("created_by", Integer(), ForeignKey('users.user_id'), nullable=True)
 
    created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    type_ = Column('type', Unicode(256))
 

	
 
    created_by_user = relationship('User')
 
    notifications_to_users = relationship('UserNotification', lazy='joined',
 
                                          cascade="all, delete, delete-orphan")
 

	
 
    @property
 
    def recipients(self):
 
        return [x.user for x in UserNotification.query()\
 
                .filter(UserNotification.notification == self)\
 
                .order_by(UserNotification.user_id.asc()).all()]
 

	
 
    @classmethod
 
    def create(cls, created_by, subject, body, recipients, type_=None):
 
        if type_ is None:
 
            type_ = Notification.TYPE_MESSAGE
 

	
 
        notification = cls()
 
        notification.created_by_user = created_by
 
        notification.subject = subject
 
        notification.body = body
 
        notification.type_ = type_
 
        notification.created_on = datetime.datetime.now()
 

	
 
        for u in recipients:
 
            assoc = UserNotification()
 
            assoc.notification = notification
 
            u.notifications.append(assoc)
 
        Session().add(notification)
 
        return notification
 

	
 
    @property
 
    def description(self):
 
        from rhodecode.model.notification import NotificationModel
 
        return NotificationModel().make_description(self)
 

	
 

	
 
class UserNotification(Base, BaseModel):
 
    __tablename__ = 'user_to_notification'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'notification_id'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    user_id = Column('user_id', Integer(), ForeignKey('users.user_id'), primary_key=True)
 
    notification_id = Column("notification_id", Integer(), ForeignKey('notifications.notification_id'), primary_key=True)
 
    read = Column('read', Boolean, default=False)
 
    sent_on = Column('sent_on', DateTime(timezone=False), nullable=True, unique=None)
 

	
 
    user = relationship('User', lazy="joined")
 
    notification = relationship('Notification', lazy="joined",
 
                                order_by=lambda: Notification.created_on.desc(),)
 

	
 
    def mark_as_read(self):
 
        self.read = True
 
        Session().add(self)
 

	
 

	
 
class DbMigrateVersion(Base, BaseModel):
 
    __tablename__ = 'db_migrate_version'
 
    __table_args__ = (
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    repository_id = Column('repository_id', String(250), primary_key=True)
 
    repository_path = Column('repository_path', Text)
 
    version = Column('version', Integer)
 

	
rhodecode/lib/vcs/backends/git/changeset.py
Show inline comments
 
@@ -198,193 +198,193 @@ class GitChangeset(BaseChangeset):
 
                return _next(cs, branch)
 

	
 
            return cs
 

	
 
        return _next(self, branch)
 

	
 
    def prev(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        def _prev(changeset, branch):
 
            try:
 
                prev_ = changeset.revision - 1
 
                if prev_ < 0:
 
                    raise IndexError
 
                prev_rev = changeset.repository.revisions[prev_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 

	
 
            cs = changeset.repository.get_changeset(prev_rev)
 

	
 
            if branch and branch != cs.branch:
 
                return _prev(cs, branch)
 

	
 
            return cs
 

	
 
        return _prev(self, branch)
 

	
 
    def diff(self, ignore_whitespace=True, context=3):
 
        rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
 
        rev2 = self
 
        return ''.join(self.repository.get_diff(rev1, rev2,
 
                                    ignore_whitespace=ignore_whitespace,
 
                                    context=context))
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        # ensure path is traversed
 
        self._get_id_for_path(path)
 
        return self._stat_modes[path]
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.as_pretty_string()
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.raw_length()
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        node = self.get_node(path)
 
        return node.history[0]
 

	
 
    def get_file_history(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        TODO: This function now uses os underlying 'git' and 'grep' commands
 
        which is generally not good. Should be replaced with algorithm
 
        iterating commits.
 
        """
 
        self._get_filectx(path)
 

	
 
        cmd = 'log --pretty="format: %%H" -s -p %s -- "%s"' % (
 
                  self.id, path
 
               )
 
        so, se = self.repository.run_git_command(cmd)
 
        ids = re.findall(r'[0-9a-fA-F]{40}', so)
 
        return [self.repository.get_changeset(id) for id in ids]
 

	
 
    def get_file_history_2(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        """
 
        self._get_filectx(path)
 
        from dulwich.walk import Walker
 
        include = [self.id]
 
        walker = Walker(self.repository._repo.object_store, include,
 
                        paths=[path], max_entries=1)
 
        return [self.repository.get_changeset(sha) 
 
        return [self.repository.get_changeset(sha)
 
                for sha in (x.commit.id for x in walker)]
 

	
 
    def get_file_annotate(self, path):
 
        """
 
        Returns a generator of four element tuples with
 
            lineno, sha, changeset lazy loader and line
 

	
 
        TODO: This function now uses os underlying 'git' command which is
 
        generally not good. Should be replaced with algorithm iterating
 
        commits.
 
        """
 
        cmd = 'blame -l --root -r %s -- "%s"' % (self.id, path)
 
        # -l     ==> outputs long shas (and we need all 40 characters)
 
        # --root ==> doesn't put '^' character for bounderies
 
        # -r sha ==> blames for the given revision
 
        so, se = self.repository.run_git_command(cmd)
 

	
 
        for i, blame_line in enumerate(so.split('\n')[:-1]):
 
            ln_no = i + 1
 
            sha, line = re.split(r' ', blame_line, 1)
 
            yield (ln_no, sha, lambda: self.repository.get_changeset(sha), line)
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None,
 
                     subrepos=False):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
 
            Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash
 
            (``repo-tip.<KIND>``).
 
        :param subrepos: include subrepos in this archive.
 

	
 
        :raise ImproperArchiveTypeError: If given kind is wrong.
 
        :raise VcsError: If given stream is None
 

	
 
        """
 
        allowed_kinds = settings.ARCHIVE_SPECS.keys()
 
        if kind not in allowed_kinds:
 
            raise ImproperArchiveTypeError('Archive kind not supported use one'
 
                'of %s', allowed_kinds)
 

	
 
        if prefix is None:
 
            prefix = '%s-%s' % (self.repository.name, self.short_id)
 
        elif prefix.startswith('/'):
 
            raise VCSError("Prefix cannot start with leading slash")
 
        elif prefix.strip() == '':
 
            raise VCSError("Prefix cannot be empty")
 

	
 
        if kind == 'zip':
 
            frmt = 'zip'
 
        else:
 
            frmt = 'tar'
 
        cmd = 'git archive --format=%s --prefix=%s/ %s' % (frmt, prefix,
 
            self.raw_id)
 
        if kind == 'tgz':
 
            cmd += ' | gzip -9'
 
        elif kind == 'tbz2':
 
            cmd += ' | bzip2 -9'
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 
        popen = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True,
 
            cwd=self.repository.path)
 

	
 
        buffer_size = 1024 * 8
 
        chunk = popen.stdout.read(buffer_size)
 
        while chunk:
 
            stream.write(chunk)
 
            chunk = popen.stdout.read(buffer_size)
 
        # Make sure all descriptors would be read
 
        popen.communicate()
 

	
 
    def get_nodes(self, path):
 
        if self._get_kind(path) != NodeKind.DIR:
 
            raise ChangesetError("Directory does not exist for revision %r at "
 
                " %r" % (self.revision, path))
 
        path = self._fix_path(path)
 
        id = self._get_id_for_path(path)
 
        tree = self.repository._repo[id]
 
        dirnodes = []
 
        filenodes = []
 
        als = self.repository.alias
 
        for name, stat, id in tree.iteritems():
 
            if objects.S_ISGITLINK(stat):
 
                dirnodes.append(SubModuleNode(name, url=None, changeset=id,
 
                                              alias=als))
 
                continue
 

	
 
            obj = self.repository._repo.get_object(id)
 
            if path != '':
 
                obj_path = '/'.join((path, name))
 
            else:
rhodecode/lib/vcs/utils/lazy.py
Show inline comments
 
class LazyProperty(object):
 
    """
 
    Decorator for easier creation of ``property`` from potentially expensive to
 
    calculate attribute of the class.
 

	
 
    Usage::
 

	
 
      class Foo(object):
 
          @LazyProperty
 
          def bar(self):
 
              print 'Calculating self._bar'
 
              return 42
 

	
 
    Taken from http://blog.pythonisito.com/2008/08/lazy-descriptors.html and
 
    used widely.
 
    """
 

	
 
    def __init__(self, func):
 
        self._func = func
 
        self.__module__ = func.__module__
 
        self.__name__ = func.__name__
 
        self.__doc__ = func.__doc__
 

	
 
    def __get__(self, obj, klass=None):
 
        if obj is None:
 
            return self
 
        result = obj.__dict__[self.__name__] = self._func(obj)
 
        return result
 

	
 
import threading
 

	
 

	
 
class ThreadLocalLazyProperty(LazyProperty):
 
    """
 
    Same as above but uses thread local dict for cache storage.
 
    """
 

	
 
    def __get__(self, obj, klass=None):
 
        if obj is None:
 
            return self
 
        if not hasattr(obj, '__tl_dict__'):
 
            obj.__tl_dict__ = threading.local().__dict__
 

	
 
        result = obj.__tl_dict__[self.__name__] = self._func(obj)
 
        return result
 

	
rhodecode/templates/admin/defaults/defaults.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="/base/base.html"/>
 

	
 
<%def name="title()">
 
    ${_('Repositories defaults')} - ${c.rhodecode_name}
 
</%def>
 

	
 
<%def name="breadcrumbs_links()">
 
    ${h.link_to(_('Admin'),h.url('admin_home'))}
 
    &raquo;
 
    ${_('Defaults')}
 
</%def>
 

	
 
<%def name="page_nav()">
 
	${self.menu('admin')}
 
</%def>
 

	
 
<%def name="main()">
 
<div class="box">
 
    <!-- box / title -->
 
    <div class="title">
 
        ${self.breadcrumbs()}
 
    </div>
 
    
 

	
 
    <h3>${_('Repositories defaults')}</h3>
 

	
 
    ${h.form(url('default', id='defaults'),method='put')}
 
    <div class="form">
 
        <!-- fields -->
 

	
 
        <div class="fields">
 
        
 

	
 
            <div class="field">
 
                <div class="label">
 
                    <label for="default_repo_type">${_('Type')}:</label>
 
                </div>
 
                <div class="input">
 
                    ${h.select('default_repo_type','hg',c.backends,class_="medium")}
 
                </div>
 
            </div>        
 
            </div>
 

	
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="default_repo_private">${_('Private repository')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('default_repo_private',value="True")}
 
                    <span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
 
                </div>
 
            </div>
 

	
 

	
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="default_repo_enable_statistics">${_('Enable statistics')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('default_repo_enable_statistics',value="True")}
 
                    <span class="help-block">${_('Enable statistics window on summary page.')}</span>
 
                </div>
 
            </div>
 

	
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="default_repo_enable_downloads">${_('Enable downloads')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('default_repo_enable_downloads',value="True")}
 
                    <span class="help-block">${_('Enable download menu on summary page.')}</span>
 
                </div>
 
            </div>
 

	
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="default_repo_enable_locking">${_('Enable locking')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('default_repo_enable_locking',value="True")}
 
                    <span class="help-block">${_('Enable lock-by-pulling on repository.')}</span>
 
                </div>
 
            </div>
 

	
 
            <div class="buttons">
 
            ${h.submit('save',_('Save'),class_="ui-btn large")}
 
            </div>
 
        </div>
 
    </div>
 
    ${h.end_form()}
 
    
 

	
 
    ##<h3>${_('Groups defaults')}</h3>
 
    
 

	
 
</div>
 
</%def>
rhodecode/templates/admin/permissions/permissions.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="/base/base.html"/>
 

	
 
<%def name="title()">
 
    ${_('Permissions administration')} - ${c.rhodecode_name}
 
</%def>
 

	
 
<%def name="breadcrumbs_links()">
 
    ${h.link_to(_('Admin'),h.url('admin_home'))}
 
    &raquo;
 
    ${_('Permissions')}
 
</%def>
 

	
 
<%def name="page_nav()">
 
	${self.menu('admin')}
 
</%def>
 

	
 
<%def name="main()">
 
<div class="box">
 
    <!-- box / title -->
 
    <div class="title">
 
        ${self.breadcrumbs()}
 
    </div>
 
    <h3>${_('Default permissions')}</h3>
 
    ${h.form(url('permission', id='default'),method='put')}
 
    <div class="form">
 
        <!-- fields -->
 
        <div class="fields">
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="anonymous">${_('Anonymous access')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    <div class="checkbox">
 
                        ${h.checkbox('anonymous',True)}
 
                    </div>
 
                </div>
 
            </div>
 
			<div class="field">
 
				<div class="label">
 
					<label for="default_repo_perm">${_('Repository')}:</label>
 
				</div>
 
				<div class="select">
 
					${h.select('default_repo_perm','',c.repo_perms_choices)}
 

	
 
	                ${h.checkbox('overwrite_default_repo','true')}
 
	                <label for="overwrite_default_repo">
 
	                <span class="tooltip"
 
	                title="${h.tooltip(_('All default permissions on each repository will be reset to choosen permission, note that all custom default permission on repositories will be lost'))}">
 
	                ${_('overwrite existing settings')}</span> </label>
 
				</div>
 
			</div>
 
			<div class="field">
 
				<div class="label">
 
					<label for="default_group_perm">${_('Repository group')}:</label>
 
				</div>
 
				<div class="select">
 
					${h.select('default_group_perm','',c.group_perms_choices)}
 
                    ${h.checkbox('overwrite_default_group','true')}
 
                    <label for="overwrite_default_group">
 
                    <span class="tooltip"
 
                    title="${h.tooltip(_('All default permissions on each repository group will be reset to choosen permission, note that all custom default permission on repositories group will be lost'))}">
 
                    ${_('overwrite existing settings')}</span> </label>
 
          
 

	
 
				</div>
 
			</div>
 
			<div class="field">
 
		        <div class="label">
 
		            <label for="default_register">${_('Registration')}:</label>
 
		        </div>
 
				<div class="select">
 
					${h.select('default_register','',c.register_choices)}
 
				</div>
 
			</div>
 
             <div class="field">
 
                <div class="label">
 
                    <label for="default_create">${_('Repository creation')}:</label>
 
                </div>
 
				<div class="select">
 
					${h.select('default_create','',c.create_choices)}
 
				</div>
 
             </div>
 
             <div class="field">
 
                <div class="label">
 
                    <label for="default_fork">${_('Repository forking')}:</label>
 
                </div>
 
                <div class="select">
 
                    ${h.select('default_fork','',c.fork_choices)}
 
                </div>
 
             </div>
 
	        <div class="buttons">
 
	        ${h.submit('set',_('set'),class_="ui-btn large")}
 
	        </div>
 
        </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
 
</%def>
rhodecode/templates/admin/repos/repo_add_base.html
Show inline comments
 
## -*- coding: utf-8 -*-
 

	
 
${h.form(url('repos'))}
 
<div class="form">
 
    <!-- fields -->
 
    <div class="fields">
 
        <div class="field">
 
            <div class="label">
 
                <label for="repo_name">${_('Name')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.text('repo_name',c.new_repo,class_="small")}
 
                %if not h.HasPermissionAll('hg.admin')('repo create form'):
 
                    ${h.hidden('user_created',True)}
 
                %endif                
 
                %endif
 
            </div>
 
         </div>
 
        <div class="field">
 
            <div class="label">
 
                <label for="clone_uri">${_('Clone from')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.text('clone_uri',class_="small")}
 
                <span class="help-block">${_('Optional http[s] url from which repository should be cloned.')}</span>
 
            </div>
 
         </div>
 
         <div class="field">
 
             <div class="label">
 
                 <label for="repo_group">${_('Repository group')}:</label>
 
             </div>
 
             <div class="input">
 
                 ${h.select('repo_group',request.GET.get('parent_group'),c.repo_groups,class_="medium")}
 
                 <span class="help-block">${_('Optionaly select a group to put this repository into.')}</span>
 
             </div>
 
         </div>
 
        <div class="field">
 
            <div class="label">
 
                <label for="repo_type">${_('Type')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.select('repo_type','hg',c.backends,class_="small")}
 
                <span class="help-block">${_('Type of repository to create.')}</span>
 
            </div>
 
         </div>
 
         <div class="field">
 
            <div class="label">
 
                <label for="repo_landing_rev">${_('Landing revision')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.select('repo_landing_rev','',c.landing_revs,class_="medium")}
 
                <span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
 
            </div>
 
        </div>
 
        <div class="field">
 
            <div class="label label-textarea">
 
                <label for="repo_description">${_('Description')}:</label>
 
            </div>
 
            <div class="textarea text-area editor">
 
                ${h.textarea('repo_description')}
 
                <span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
 
            </div>
 
         </div>
 
        <div class="field">
 
            <div class="label label-checkbox">
 
                <label for="repo_private">${_('Private repository')}:</label>
 
            </div>
 
            <div class="checkboxes">
 
                ${h.checkbox('repo_private',value="True")}
 
                <span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
 
            </div>
 
         </div>
 
        <div class="buttons">
 
          ${h.submit('add',_('add'),class_="ui-btn large")}
 
        </div>
 
    </div>
 
</div>
 
${h.end_form()}
rhodecode/tests/__init__.py
Show inline comments
 
@@ -89,98 +89,96 @@ uniq_suffix = str(int(time.mktime(dateti
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session)
 
        if not msg in response.session['flash'][0][1]:
 
            self.fail(
 
                'msg `%s` not found in session flash: got `%s` instead' % (
 
                      msg, response.session['flash'])
 
            )
 

	
 

	
 
## HELPERS ##
 

	
 
def _get_repo_create_params(**custom):
 
    defs = {
 
        'repo_name': None,
 
        'repo_type': 'hg',
 
        'clone_uri': '',
 
        'repo_group': '',
 
        'repo_description': 'DESC',
 
        'repo_private': False,
 
        'repo_landing_rev': 'tip'
 
    }
 
    defs.update(custom)
 
    if 'repo_name_full' not in custom:
 
        defs.update({'repo_name_full': defs['repo_name']})
 

	
 
    return defs
 

	
 

	
rhodecode/tests/functional/test_admin_repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
from rhodecode.lib import vcs
 

	
 
from rhodecode.model.db import Repository, RepoGroup
 
from rhodecode.tests import *
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 

	
 

	
 
class TestAdminReposController(TestController):
 

	
 
    def __make_repo(self):
 
        pass
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_repos', format='xml'))
 

	
 
    def test_create_hg(self):
 
        self.log_user()
 
        repo_name = NEW_HG_REPO
 
        description = 'description for newly created repo'
 
        response = self.app.post(url('repos'), 
 
        response = self.app.post(url('repos'),
 
                        _get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_description=description))
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_hg_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
 
        repo_name_unicode = repo_name.decode('utf8')
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = description.decode('utf8')
 
        private = False
 
        response = self.app.post(url('repos'),
 
                        _get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_description=description))
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name_unicode))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_hg_in_group(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = 'sometest'
 
        gr = ReposGroupModel().create(group_name=group_name,
 
                                      group_description='test',)
 
        self.Session().commit()
 

	
 
        repo_name = 'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        _get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,))
 

	
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name_full).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name_full)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
 
        except:
 
            ReposGroupModel().delete(group_name)
 
            self.Session().commit()
 
@@ -169,158 +169,158 @@ class TestAdminReposController(TestContr
 
        description_unicode = description.decode('utf8')
 
        private = False
 
        response = self.app.post(url('repos'),
 
                        _get_repo_create_params(repo_private=False,
 
                                                repo_type='git',
 
                                                repo_name=repo_name,
 
                                                repo_description=description))
 

	
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name_unicode))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_repo'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_repo', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('repo', repo_name=HG_REPO))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name=HG_REPO),
 
                                 params=dict(_method='put'))
 

	
 
    def test_delete_hg(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'),
 
                        _get_repo_create_params(repo_private=False,
 
                                                repo_type='hg',
 
                                                repo_name=repo_name,
 
                                                repo_description=description))
 

	
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 

	
 
        self.assertTrue('''deleted repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
                                  False)
 

	
 
    def test_delete_git(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), 
 
        response = self.app.post(url('repos'),
 
                        _get_repo_create_params(repo_private=False,
 
                                                repo_type='git',
 
                                                repo_name=repo_name,
 
                                                repo_description=description))
 

	
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 

	
 
        self.assertTrue('''deleted repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.Session().query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
                                  False)
 

	
 
    def test_delete_repo_with_group(self):
 
        #TODO:
 
        pass
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name=HG_REPO),
 
                                 params=dict(_method='delete'))
 

	
 
    def test_show_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name=HG_REPO))
 

	
 
    def test_show_git(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name=GIT_REPO))
 

	
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))
0 comments (0 inline, 0 general)