Changeset - 9492ab68331f
[Not reviewed]
rhodecode/controllers/admin/repos.py
Show inline comments
 
@@ -49,94 +49,97 @@ from rhodecode.model.repo import RepoMod
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposController(BaseController):
 
    """
 
    REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('repo', 'repos')
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def __before__(self):
 
        c.admin_user = session.get('admin_user')
 
        c.admin_username = session.get('admin_username')
 
        super(ReposController, self).__before__()
 

	
 
    def __load_defaults(self):
 
        c.repo_groups = RepoGroup.groups_choices()
 
        c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
 

	
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.users_groups_array = repo_model.get_users_groups_js()
 
        c.landing_revs = ScmModel().get_repo_landing_revs()
 

	
 
    def __load_data(self, repo_name=None):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param repo_name:
 
        """
 
        self.__load_defaults()
 

	
 
        c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
 
        repo = db_repo.scm_instance
 

	
 
        if c.repo_info is None:
 
            h.flash(_('%s repository is not mapped to db perhaps'
 
                      ' it was created or renamed from the filesystem'
 
                      ' please run the application again'
 
                      ' in order to rescan repositories') % repo_name,
 
                      category='error')
 

	
 
            return redirect(url('repos'))
 

	
 
        c.landing_revs = ScmModel().get_repo_landing_revs(c.repo_info)
 
        c.default_user_id = User.get_by_username('default').user_id
 
        c.in_public_journal = UserFollowing.query()\
 
            .filter(UserFollowing.user_id == c.default_user_id)\
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        if c.repo_info.stats:
 
            # this is on what revision we ended up so we add +1 for count
 
            last_rev = c.repo_info.stats.stat_on_revision + 1
 
        else:
 
            last_rev = 0
 
        c.stats_revision = last_rev
 

	
 
        c.repo_last_rev = repo.count() if repo.revisions else 0
 

	
 
        if last_rev == 0 or c.repo_last_rev == 0:
 
            c.stats_percentage = 0
 
        else:
 
            c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                            c.repo_last_rev) * 100)
 

	
 
        defaults = RepoModel()._get_defaults(repo_name)
 

	
 
        c.repos_list = [('', _('--REMOVE FORK--'))]
 
        c.repos_list += [(x.repo_id, x.repo_name) for x in
 
                   Repository.query().order_by(Repository.repo_name).all()]
 

	
 
        return defaults
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def index(self, format='html'):
 
        """GET /repos: All items in the collection"""
 
        # url('repos')
 

	
 
        c.repos_list = ScmModel().get_repos(Repository.query()
 
                                            .order_by(Repository.repo_name)
 
                                            .all(), sort_key='name_sort')
 
        return render('admin/repos/repos.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create(self):
 
        """
 
        POST /repos: Create a new item"""
 
        # url('repos')
 

	
 
        self.__load_defaults()
 
        form_result = {}
 
        try:
 
            form_result = RepoForm(repo_groups=c.repo_groups_choices)()\
 
                            .to_python(dict(request.POST))
 
            RepoModel().create(form_result, self.rhodecode_user)
rhodecode/controllers/admin/settings.py
Show inline comments
 
@@ -376,48 +376,49 @@ class SettingsController(BaseController)
 
                .all()
 
            c.user_repos = ScmModel().get_repos(all_repos)
 

	
 
            c.form = htmlfill.render(
 
                render('admin/users/user_edit_my_account_form.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8")
 
            return render('admin/users/user_edit_my_account.html')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occurred during update of user %s') \
 
                    % form_result.get('username'), category='error')
 

	
 
        return redirect(url('my_account'))
 

	
 
    @NotAnonymous()
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create_repository(self):
 
        """GET /_admin/create_repository: Form to create a new item"""
 

	
 
        c.repo_groups = RepoGroup.groups_choices()
 
        c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
 
        c.landing_revs = ScmModel().get_repo_landing_revs()
 

	
 
        new_repo = request.GET.get('repo', '')
 
        c.new_repo = repo_name_slug(new_repo)
 

	
 
        return render('admin/repos/repo_add_create_repository.html')
 

	
 
    def get_hg_ui_settings(self):
 
        ret = self.sa.query(RhodeCodeUi).all()
 

	
 
        if not ret:
 
            raise Exception('Could not get application ui settings !')
 
        settings = {}
 
        for each in ret:
 
            k = each.ui_key
 
            v = each.ui_value
 
            if k == '/':
 
                k = 'root_path'
 

	
 
            if k.find('.') != -1:
 
                k = k.replace('.', '_')
 

	
 
            if each.ui_section == 'hooks':
 
                v = each.ui_active
 

	
rhodecode/lib/utils.py
Show inline comments
 
@@ -427,49 +427,50 @@ def repo2db_mapper(initial_repo_list, re
 

	
 
    :param initial_repo_list: list of repositories found by scanning methods
 
    :param remove_obsolete: check for obsolete entries in database
 
    """
 
    from rhodecode.model.repo import RepoModel
 
    sa = meta.Session
 
    rm = RepoModel()
 
    user = sa.query(User).filter(User.admin == True).first()
 
    if user is None:
 
        raise Exception('Missing administrative account !')
 
    added = []
 

	
 
    for name, repo in initial_repo_list.items():
 
        group = map_groups(name)
 
        if not rm.get_by_repo_name(name, cache=False):
 
            log.info('repository %s not found creating default' % name)
 
            added.append(name)
 
            form_data = {
 
             'repo_name': name,
 
             'repo_name_full': name,
 
             'repo_type': repo.alias,
 
             'description': repo.description \
 
                if repo.description != 'unknown' else '%s repository' % name,
 
             'private': False,
 
             'group_id': getattr(group, 'group_id', None)
 
             'group_id': getattr(group, 'group_id', None),
 
             'landing_rev': repo.DEFAULT_BRANCH_NAME
 
            }
 
            rm.create(form_data, user, just_db=True)
 
    sa.commit()
 
    removed = []
 
    if remove_obsolete:
 
        # remove from database those repositories that are not in the filesystem
 
        for repo in sa.query(Repository).all():
 
            if repo.repo_name not in initial_repo_list.keys():
 
                log.debug("Removing non existing repository found in db %s" %
 
                          repo.repo_name)
 
                removed.append(repo.repo_name)
 
                sa.delete(repo)
 
                sa.commit()
 

	
 
    # clear cache keys
 
    log.debug("Clearing cache keys now...")
 
    CacheInvalidation.clear_cache()
 
    sa.commit()
 
    return added, removed
 

	
 

	
 
# set cache regions for beaker so celery can utilise it
 
def add_cache(settings):
 
    cache_settings = {'regions': None}
 
@@ -537,89 +538,94 @@ def create_test_index(repo_location, con
 
    from rhodecode.lib.pidlock import DaemonLock, LockHeld
 

	
 
    repo_location = repo_location
 

	
 
    index_location = os.path.join(config['app_conf']['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=jn(dn(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 
    from rhodecode.lib.db_manage import DbManage
 
    from rhodecode.tests import HG_REPO, TESTS_TMP_PATH
 
    from rhodecode.tests import HG_REPO, GIT_REPO, TESTS_TMP_PATH
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.db1.url']
 
    log.debug('making test db %s' % dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s' % repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session.commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['app_conf']['index_dir']
 
    data_path = config['app_conf']['cache_dir']
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s' % idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s' % data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT HG REPOSITORY
 
    #CREATE DEFAULT TEST REPOS
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from rhodecode.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
#==============================================================================
 
# PASTER COMMANDS
 
#==============================================================================
 
class BasePasterCommand(Command):
 
    """
 
    Abstract Base Class for paster commands.
 

	
 
    The celery commands are somewhat aggressive about loading
 
    celery.conf, and since our module sets the `CELERY_LOADER`
 
    environment variable to our loader, we have to bootstrap a bit and
 
    make sure we've had a chance to load the pylons config off of the
 
    command line, otherwise everything fails.
 
    """
 
    min_args = 1
 
    min_args_error = "Please provide a paster config file as an argument."
 
    takes_config_file = 1
 
    requires_config_file = True
 

	
 
    def notify_msg(self, msg, log=False):
rhodecode/model/db.py
Show inline comments
 
@@ -274,49 +274,49 @@ class RhodeCodeUi(Base, BaseModel):
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_by_key(key).scalar() or cls()
 
        new_ui.ui_section = 'hooks'
 
        new_ui.ui_active = True
 
        new_ui.ui_key = key
 
        new_ui.ui_value = val
 

	
 
        Session.add(new_ui)
 

	
 

	
 
class User(Base, BaseModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        UniqueConstraint('username'), UniqueConstraint('email'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    active = Column("active", Boolean(), nullable=True, unique=None, default=None)
 
    admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
 
    name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    name = Column("firstname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    _email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
 
    ldap_dn = Column("ldap_dn", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    api_key = Column("api_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 

	
 
    user_log = relationship('UserLog', cascade='all')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UsersGroupMember', cascade='all')
 

	
 
    notifications = relationship('UserNotification', cascade='all')
 
    # notifications assigned to this user
 
    user_created_notifications = relationship('Notification', cascade='all')
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
@@ -479,72 +479,73 @@ class UsersGroupMember(Base, BaseModel):
 

	
 
    users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 

	
 
    user = relationship('User', lazy='joined')
 
    users_group = relationship('UsersGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class Repository(Base, BaseModel):
 
    __tablename__ = 'repositories'
 
    __table_args__ = (
 
        UniqueConstraint('repo_name'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 

	
 
    repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    repo_name = Column("repo_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
 
    clone_uri = Column("clone_uri", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
 
    repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
 
    repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
 
    private = Column("private", Boolean(), nullable=True, unique=None, default=None)
 
    enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
 
    enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
 
    description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
 
    landing_rev = Column("landing_revision", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
 

	
 
    fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
 
    group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
 

	
 
    user = relationship('User')
 
    fork = relationship('Repository', remote_side=repo_id)
 
    group = relationship('RepoGroup')
 
    repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
 
    users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
 
    stats = relationship('Statistics', cascade='all', uselist=False)
 

	
 
    followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
 

	
 
    logs = relationship('UserLog')
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__,self.repo_id,
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
 
                                   self.repo_name)
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name):
 
        q = Session.query(cls).filter(cls.repo_name == repo_name)
 
        q = q.options(joinedload(Repository.fork))\
 
                .options(joinedload(Repository.user))\
 
                .options(joinedload(Repository.group))
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_full_path(cls, repo_full_path):
 
        repo_name = repo_full_path.split(cls.base_path(), 1)[-1]
 
        return cls.get_by_repo_name(repo_name.strip(URL_SEP))
 

	
 
    @classmethod
 
    def get_repo_forks(cls, repo_id):
 
        return cls.query().filter(Repository.fork_id == repo_id)
 

	
 
    @classmethod
rhodecode/model/forms.py
Show inline comments
 
@@ -641,93 +641,94 @@ def RegisterForm(edit=False, old_data={}
 

	
 
        chained_validators = [ValidPasswordsMatch, ValidPassword]
 

	
 
    return _RegisterForm
 

	
 

	
 
def PasswordResetForm():
 
    class _PasswordResetForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = All(ValidSystemEmail(), Email(not_empty=True))
 
    return _PasswordResetForm
 

	
 

	
 
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
             repo_groups=[]):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        clone_uri = All(UnicodeString(strip=True, min=1, not_empty=False))
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        repo_type = OneOf(supported_backends)
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        description = UnicodeString(strip=True, min=1, not_empty=False)
 
        private = StringBoolean(if_missing=False)
 
        enable_statistics = StringBoolean(if_missing=False)
 
        enable_downloads = StringBoolean(if_missing=False)
 
        landing_rev = UnicodeString(strip=True, min=1, not_empty=True)
 

	
 
        if edit:
 
            #this is repo owner
 
            user = All(UnicodeString(not_empty=True), ValidRepoUser)
 

	
 
        chained_validators = [ValidCloneUri()(),
 
                              ValidRepoName(edit, old_data),
 
                              ValidPerms()]
 
    return _RepoForm
 

	
 

	
 
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                 repo_groups=[]):
 
def RepoForkForm(edit=False, old_data={},
 
                 supported_backends=BACKENDS.keys(), repo_groups=[]):
 
    class _RepoForkForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        repo_type = All(ValidForkType(old_data), OneOf(supported_backends))
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        private = StringBoolean(if_missing=False)
 
        copy_permissions = StringBoolean(if_missing=False)
 
        update_after_clone = StringBoolean(if_missing=False)
 
        fork_parent_id = UnicodeString()
 
        chained_validators = [ValidForkName(edit, old_data)]
 

	
 
    return _RepoForkForm
 

	
 

	
 
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                     repo_groups=[]):
 
def RepoSettingsForm(edit=False, old_data={},
 
                     supported_backends=BACKENDS.keys(), repo_groups=[]):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        private = StringBoolean(if_missing=False)
 

	
 
        landing_rev = UnicodeString(strip=True, min=1, not_empty=True)
 
        chained_validators = [ValidRepoName(edit, old_data), ValidPerms(),
 
                              ValidSettings]
 
    return _RepoForm
 

	
 

	
 
def ApplicationSettingsForm():
 
    class _ApplicationSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_ga_code = UnicodeString(strip=True, min=1, not_empty=False)
 

	
 
    return _ApplicationSettingsForm
 

	
 

	
 
def ApplicationUiSettingsForm():
 
    class _ApplicationUiSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        web_push_ssl = OneOf(['true', 'false'], if_missing='false')
 
        paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True))
 
        hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False)
 
        hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False)
rhodecode/model/scm.py
Show inline comments
 
@@ -8,48 +8,49 @@
 
    :created_on: Apr 9, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
import os
 
import time
 
import traceback
 
import logging
 
import cStringIO
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
from rhodecode.lib.vcs import get_backend
 
from rhodecode.lib.vcs.exceptions import RepositoryError
 
from rhodecode.lib.vcs.utils.lazy import LazyProperty
 
from rhodecode.lib.vcs.nodes import FileNode
 

	
 
from rhodecode import BACKENDS
 
from rhodecode.lib import helpers as h
 
from rhodecode.lib.utils2 import safe_str, safe_unicode
 
from rhodecode.lib.auth import HasRepoPermissionAny, HasReposGroupPermissionAny
 
from rhodecode.lib.utils import get_repos as get_filesystem_repos, make_ui, \
 
    action_logger, EmptyChangeset, REMOVED_REPO_PAT
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import Repository, RhodeCodeUi, CacheInvalidation, \
 
    UserFollowing, UserLog, User, RepoGroup
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
@@ -451,24 +452,54 @@ class ScmModel(BaseModel):
 
        :param flat: return as a list, if False returns a dict with decription
 

	
 
        """
 
        _files = list()
 
        _dirs = list()
 
        try:
 
            _repo = self.__get_repo(repo_name)
 
            changeset = _repo.scm_instance.get_changeset(revision)
 
            root_path = root_path.lstrip('/')
 
            for topnode, dirs, files in changeset.walk(root_path):
 
                for f in files:
 
                    _files.append(f.path if flat else {"name": f.path,
 
                                                       "type": "file"})
 
                for d in dirs:
 
                    _dirs.append(d.path if flat else {"name": d.path,
 
                                                      "type": "dir"})
 
        except RepositoryError:
 
            log.debug(traceback.format_exc())
 
            raise
 

	
 
        return _dirs, _files
 

	
 
    def get_unread_journal(self):
 
        return self.sa.query(UserLog).count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        :type repo:
 
        """
 
        hist_l = []
 
        repo = self.__get_repo(repo)
 
        hist_l.append(['tip', _('latest tip')])
 
        if not repo:
 
            return hist_l
 

	
 
        repo = repo.scm_instance
 
        branches_group = ([(k, k) for k, v in
 
                           repo.branches.iteritems()], _("Branches"))
 
        hist_l.append(branches_group)
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([(k, k) for k, v in
 
                                repo.bookmarks.iteritems()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 

	
 
        tags_group = ([(k, k) for k, v in
 
                       repo.tags.iteritems()], _("Tags"))
 
        hist_l.append(tags_group)
 

	
 
        return hist_l
rhodecode/templates/admin/repos/repo_add_base.html
Show inline comments
 
@@ -21,48 +21,57 @@ ${h.form(url('repos'))}
 
            </div>
 
            <div class="input">
 
                ${h.text('clone_uri',class_="small")}
 
                <span class="help-block">${_('Optional http[s] url from which repository should be cloned.')}</span>
 
            </div>
 
         </div>
 
         <div class="field">
 
             <div class="label">
 
                 <label for="repo_group">${_('Repository group')}:</label>
 
             </div>
 
             <div class="input">
 
                 ${h.select('repo_group',request.GET.get('parent_group'),c.repo_groups,class_="medium")}
 
                 <span class="help-block">${_('Optional select a group to put this repository into.')}</span>
 
             </div>
 
         </div>
 
        <div class="field">
 
            <div class="label">
 
                <label for="repo_type">${_('Type')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.select('repo_type','hg',c.backends,class_="small")}
 
                <span class="help-block">${_('Type of repository to create.')}</span>
 
            </div>
 
         </div>
 
         <div class="field">
 
            <div class="label">
 
                <label for="landing_rev">${_('Landing revision')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.select('landing_rev','',c.landing_revs,class_="medium")}
 
                <span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
 
            </div>
 
        </div>           
 
        <div class="field">
 
            <div class="label label-textarea">
 
                <label for="description">${_('Description')}:</label>
 
            </div>
 
            <div class="textarea text-area editor">
 
                ${h.textarea('description')}
 
                <span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
 
            </div>
 
         </div>
 
        <div class="field">
 
            <div class="label label-checkbox">
 
                <label for="private">${_('Private repository')}:</label>
 
            </div>
 
            <div class="checkboxes">
 
                ${h.checkbox('private',value="True")}
 
                <span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
 
            </div>
 
         </div>
 
        <div class="buttons">
 
          ${h.submit('add',_('add'),class_="ui-button")}
 
        </div>
 
    </div>
 
</div>
 
${h.end_form()}
rhodecode/templates/admin/repos/repo_edit.html
Show inline comments
 
@@ -41,48 +41,57 @@
 
	           </div>
 
	           <div class="input">
 
	               ${h.text('clone_uri',class_="medium")}
 
                 <span class="help-block">${_('Optional http[s] url from which repository should be cloned.')}</span>
 
	           </div>
 
	        </div>
 
	        <div class="field">
 
	            <div class="label">
 
	                <label for="repo_group">${_('Repository group')}:</label>
 
	            </div>
 
	            <div class="input">
 
	                ${h.select('repo_group','',c.repo_groups,class_="medium")}
 
                    <span class="help-block">${_('Optional select a group to put this repository into.')}</span>
 
	            </div>
 
	        </div>
 
            <div class="field">
 
                <div class="label">
 
                    <label for="repo_type">${_('Type')}:</label>
 
                </div>
 
                <div class="input">
 
                    ${h.select('repo_type','hg',c.backends,class_="medium")}
 
                </div>
 
            </div>
 
            <div class="field">
 
                <div class="label">
 
                    <label for="landing_rev">${_('Landing revision')}:</label>
 
                </div>
 
                <div class="input">
 
                    ${h.select('landing_rev','',c.landing_revs,class_="medium")}
 
                    <span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
 
                </div>
 
            </div>            
 
            <div class="field">
 
                <div class="label label-textarea">
 
                    <label for="description">${_('Description')}:</label>
 
                </div>
 
                <div class="textarea text-area editor">
 
                    ${h.textarea('description')}
 
                    <span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
 
                </div>
 
            </div>
 

	
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="private">${_('Private repository')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('private',value="True")}
 
                    <span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
 
                </div>
 
            </div>
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="enable_statistics">${_('Enable statistics')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('enable_statistics',value="True")}
 
@@ -189,43 +198,43 @@
 
               <ul>
 
                    <li>${_('''All actions made on this repository will be accessible to everyone in public journal''')}
 
                    </li>
 
               </ul>
 
               </div>
 
        </div>
 
        ${h.end_form()}
 

	
 
        <h3>${_('Delete')}</h3>
 
        ${h.form(url('repo', repo_name=c.repo_info.repo_name),method='delete')}
 
        <div class="form">
 
           <div class="fields">
 
               ${h.submit('remove_%s' % c.repo_info.repo_name,_('Remove this repository'),class_="ui-btn red",onclick="return confirm('"+_('Confirm to delete this repository')+"');")}
 
           </div>
 
           <div class="field" style="border:none;color:#888">
 
           <ul>
 
                <li>${_('''This repository will be renamed in a special way in order to be unaccesible for RhodeCode and VCS systems.
 
                         If you need fully delete it from filesystem please do it manually''')}
 
                </li>
 
           </ul>
 
           </div>
 
        </div>
 
        ${h.end_form()}
 

	
 
        <h3>${_('Set as fork')}</h3>
 
        <h3>${_('Set as fork of')}</h3>
 
        ${h.form(url('repo_as_fork', repo_name=c.repo_info.repo_name),method='put')}
 
        <div class="form">
 
           <div class="fields">
 
               ${h.select('id_fork_of','',c.repos_list,class_="medium")}
 
               ${h.submit('set_as_fork_%s' % c.repo_info.repo_name,_('set'),class_="ui-btn",)}
 
           </div>
 
               <div class="field" style="border:none;color:#888">
 
               <ul>
 
                    <li>${_('''Manually set this repository as a fork of another''')}</li>
 
                    <li>${_('''Manually set this repository as a fork of another from the list''')}</li>
 
               </ul>
 
               </div>
 
        </div>
 
        ${h.end_form()}
 

	
 
</div>
 

	
 

	
 
</%def>
rhodecode/tests/__init__.py
Show inline comments
 
@@ -17,130 +17,130 @@ from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 

	
 
import pylons.test
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
 
    'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
 
    'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
 
    'environ', 'url', 'get_new_dir', 'TestController', 'TESTS_TMP_PATH',
 
    'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK',
 
    'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
 
    'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
 
    'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
 
    'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_GIT_REPO',
 
    'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
 
    'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
 
    'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
 
    'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
# nosetests --pdb --pdb-failures
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
THIS = os.path.abspath(os.path.dirname(__file__))
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, 'vcs-hg')
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
PACKAGE_DIR = os.path.abspath(os.path.join(
 
    os.path.dirname(__file__), '..'))
 

	
 
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
rhodecode/tests/functional/test_admin_repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
from rhodecode.lib import vcs
 

	
 
from rhodecode.model.db import Repository
 
from rhodecode.tests import *
 

	
 

	
 
class TestAdminReposController(TestController):
 

	
 
    def __make_repo(self):
 
        pass
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_repos', format='xml'))
 

	
 
    def test_create_hg(self):
 
        self.log_user()
 
        repo_name = NEW_HG_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 
        self.checkSessionFlash(response, 'created repository %s' % (repo_name))
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'hg',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
 
                                                    repo_name).one()
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        self.assertTrue(repo_name in response.body)
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo in filesystem')
 

	
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_hg_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
 
        repo_name_unicode = repo_name.decode('utf8')
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = description.decode('utf8')
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'hg',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name_unicode))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
 
                                                repo_name_unicode).one()
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        self.assertTrue(repo_name in response.body)
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo in filesystem')
 

	
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_hg_in_group(self):
 
        #TODO: write test !
 
        pass
 

	
 
    def test_create_git(self):
 
        return
 
        self.log_user()
 
        repo_name = NEW_GIT_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'git',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 

	
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'git',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if we have a message for that repository
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        #test if the fork was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            assert False , 'no repo in filesystem'
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_git_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
 
        repo_name_unicode = repo_name.decode('utf8')
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = description.decode('utf8')
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'git',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name_unicode))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_repo'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_repo', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('repo', repo_name=HG_REPO))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name=HG_REPO),
 
                                 params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
    def test_delete_hg(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 

	
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 
        self.assertTrue('flash' in response.session)
 

	
 
        #test if we have a message for that repository
 
        self.assertTrue('''created repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'hg',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
 
                                                    repo_name).one()
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        self.assertTrue(repo_name in response.body)
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 

	
 
        self.assertTrue('''deleted repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.Session.query(Repository).filter(Repository.repo_name
 
                                                        == repo_name).scalar()
 
        deleted_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
                                  False)
 

	
 
    def test_delete_git(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'git',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 

	
 
        self.assertTrue('''deleted repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
                                  False)
 

	
 
    def test_delete_repo_with_group(self):
 
        #TODO:
 
        pass
 

	
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name=HG_REPO),
 
                                 params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
    def test_show_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name=HG_REPO))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_repo', repo_name=HG_REPO,
 
                                    format='xml'))
 
    def test_show_git(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name=GIT_REPO))
 

	
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_repo', repo_name=HG_REPO,
 
                                    format='xml'))
rhodecode/tests/functional/test_forks.py
Show inline comments
 
@@ -7,109 +7,136 @@ from rhodecode.model.user import UserMod
 

	
 
class TestForksController(TestController):
 

	
 
    def setUp(self):
 
        self.username = u'forkuser'
 
        self.password = u'qweqwe'
 
        self.u1 = UserModel().create_or_update(
 
            username=self.username, password=self.password,
 
            email=u'fork_king@rhodecode.org', name=u'u1', lastname=u'u1'
 
        )
 
        self.Session.commit()
 

	
 
    def tearDown(self):
 
        self.Session.delete(self.u1)
 
        self.Session.commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        repo_name = HG_REPO
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        self.assertTrue("""There are no forks yet""" in response.body)
 

	
 
    def test_index_with_fork(self):
 
    def test_index_with_fork_hg(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = HG_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks',
 
                                     action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name':fork_name,
 
                                     'repo_group':'',
 
                                     'fork_parent_id':org_repo.repo_id,
 
                                     'repo_type':'hg',
 
                                     'description':description,
 
                                     'private':'False'})
 
                                    {'repo_name': fork_name,
 
                                     'repo_group': '',
 
                                     'fork_parent_id': org_repo.repo_id,
 
                                     'repo_type': 'hg',
 
                                     'description': description,
 
                                     'private': 'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        self.assertTrue("""<a href="/%s/summary">"""
 
                         """vcs_test_hg_fork</a>""" % fork_name
 
                         in response.body)
 
        response.mustcontain(
 
            """<a href="/%s/summary">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        #remove this fork
 
        response = self.app.delete(url('repo', repo_name=fork_name))
 

	
 
    def test_index_with_fork_git(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = GIT_FORK
 
        description = 'fork of vcs test'
 
        repo_name = GIT_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks',
 
                                     action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name': fork_name,
 
                                     'repo_group': '',
 
                                     'fork_parent_id': org_repo.repo_id,
 
                                     'repo_type': 'git',
 
                                     'description': description,
 
                                     'private': 'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(
 
            """<a href="/%s/summary">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        #remove this fork
 
        response = self.app.delete(url('repo', repo_name=fork_name))
 

	
 
    def test_z_fork_create(self):
 
        self.log_user()
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = HG_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks', action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name':fork_name,
 
                                     'repo_group':'',
 
                                     'fork_parent_id':org_repo.repo_id,
 
                                     'repo_type':'hg',
 
                                     'description':description,
 
                                     'private':'False'})
 
                                     'private':'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        #test if we have a message that fork is ok
 
        self.assertTrue('forked %s repository as %s' \
 
                      % (repo_name, fork_name) in response.session['flash'][0])
 
        self.checkSessionFlash(response,
 
                'forked %s repository as %s' % (repo_name, fork_name))
 

	
 
        #test if the fork was created in the database
 
        fork_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == fork_name).one()
 

	
 
        self.assertEqual(fork_repo.repo_name, fork_name)
 
        self.assertEqual(fork_repo.fork.repo_name, repo_name)
 

	
 
        #test if fork is visible in the list ?
 
        response = response.follow()
 

	
 
        # check if fork is marked as fork
 
        # wait for cache to expire
 
        import time
 
        time.sleep(10)
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=fork_name))
 

	
 
        self.assertTrue('Fork of %s' % repo_name in response.body)
 

	
 
    def test_zz_fork_permission_page(self):
 
        usr = self.log_user(self.username, self.password)['user_id']
 
        repo_name = HG_REPO
 

	
 
        forks = self.Session.query(Repository)\
 
            .filter(Repository.fork_id != None)\
 
            .all()
 
        self.assertEqual(1, len(forks))
 

	
 
        # set read permissions for this
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr,
 
                                          perm='repository.read')
 
        self.Session.commit()
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain('<div style="padding:5px 3px 3px 42px;">fork of vcs test</div>')
rhodecode/tests/functional/test_search.py
Show inline comments
 
import os
 
from rhodecode.tests import *
 
import os
 
from nose.plugins.skip import SkipTest
 

	
 

	
 
class TestSearchController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'))
 

	
 
        self.assertTrue('class="small" id="q" name="q" type="text"' in
 
                        response.body)
 
        # Test response...
 

	
 
    def test_empty_search(self):
 
        if os.path.isdir(self.index_location):
 
            raise SkipTest('skipped due to existing index')
 
        else:
 
            self.log_user()
 
            response = self.app.get(url(controller='search', action='index'),
 
                                    {'q': HG_REPO})
 
            self.assertTrue('There is no index to search in. '
 
                            'Please run whoosh indexer' in response.body)
 

	
 
    def test_normal_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'def repo'})
 
        response.mustcontain('10 results')
 
        response.mustcontain('Permission denied')
 
        response.mustcontain('39 results')
 

	
 
    def test_repo_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'repository:%s def test' % HG_REPO})
 

	
 
        response.mustcontain('4 results')
 
        response.mustcontain('Permission denied')
rhodecode/tests/functional/test_summary.py
Show inline comments
 
@@ -71,34 +71,34 @@ class TestSummaryController(TestControll
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="Mercurial repository" alt="Mercurial """
 
                        """repository" src="/images/icons/hgicon.png"/>""")
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="public repository" alt="public """
 
                        """repository" src="/images/icons/lock_open.png"/>""")
 

	
 
    def test_index_by_id_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="Git repository" alt="Git """
 
                        """repository" src="/images/icons/hgicon.png"/>""")
 
                        """repository" src="/images/icons/giticon.png"/>""")
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="public repository" alt="public """
 
                        """repository" src="/images/icons/lock_open.png"/>""")
 

	
 
    def _enable_stats(self):
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_statistics = True
 
        self.Session.add(r)
 
        self.Session.commit()
rhodecode/tests/test_models.py
Show inline comments
 
@@ -115,49 +115,50 @@ class TestReposGroups(unittest.TestCase)
 
        self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        self.assertTrue(self.__check_path('test3', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'hello')
 
        self.assertTrue(self.__check_path('hello'))
 

	
 
        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = _make_group('g1')
 
        g2 = _make_group('g2')
 

	
 
        # create new repo
 
        form_data = dict(repo_name='john',
 
                         repo_name_full='john',
 
                         fork_name=None,
 
                         description=None,
 
                         repo_group=None,
 
                         private=False,
 
                         repo_type='hg',
 
                         clone_uri=None)
 
                         clone_uri=None,
 
                         landing_rev='tip')
 
        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        r = RepoModel().create(form_data, cur_user)
 

	
 
        self.assertEqual(r.repo_name, 'john')
 

	
 
        # put repo into group
 
        form_data = form_data
 
        form_data['repo_group'] = g1.group_id
 
        form_data['perms_new'] = []
 
        form_data['perms_updates'] = []
 
        RepoModel().update(r.repo_name, form_data)
 
        self.assertEqual(r.repo_name, 'g1/john')
 

	
 
        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        self.assertTrue(self.__check_path('g2', 'g1'))
 

	
 
        # test repo
 
        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
 

	
 
    def test_move_to_root(self):
 
        g1 = _make_group('t11')
 
        Session.commit()
 
        g2 = _make_group('t22', parent_id=g1.group_id)
 
        Session.commit()
 
@@ -587,66 +588,66 @@ class TestPermissions(unittest.TestCase)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_repo_in_group_permissions(self):
 
        self.g1 = _make_group('group1', skip_if_exists=True)
 
        self.g2 = _make_group('group2', skip_if_exists=True)
 
        Session.commit()
 
        # both perms should be read !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        #Change perms to none for both groups
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.anon,
 
                                                perm='group.none')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.anon,
 
                                                perm='group.none')
 

	
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        # add repo to group
 
        form_data = {
 
            'repo_name':HG_REPO,
 
            'repo_name_full':RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
 
            'repo_type':'hg',
 
            'clone_uri':'',
 
            'repo_group':self.g1.group_id,
 
            'description':'desc',
 
            'private':False
 
            'repo_name': HG_REPO,
 
            'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
 
            'repo_type': 'hg',
 
            'clone_uri': '',
 
            'repo_group': self.g1.group_id,
 
            'description': 'desc',
 
            'private': False,
 
            'landing_rev': 'tip'
 
        }
 
        self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
 
        Session.commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        #grant permission for u2 !
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        Session.commit()
 
        self.assertNotEqual(self.u1, self.u2)
 
        #u1 and anon should have not change perms while u2 should !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
rhodecode/tests/vcs/__init__.py
Show inline comments
 
@@ -31,26 +31,26 @@ def setup_package():
 
    Prepares whole package for tests which mainly means it would try to fetch
 
    test repositories or use already existing ones.
 
    """
 
    fetchers = {
 
        'hg': {
 
            'alias': 'hg',
 
            'test_repo_path': TEST_HG_REPO,
 
            'remote_repo': HG_REMOTE_REPO,
 
            'clone_cmd': 'hg clone',
 
        },
 
        'git': {
 
            'alias': 'git',
 
            'test_repo_path': TEST_GIT_REPO,
 
            'remote_repo': GIT_REMOTE_REPO,
 
            'clone_cmd': 'git clone --bare',
 
        },
 
    }
 
    try:
 
        for scm, fetcher_info in fetchers.items():
 
            fetcher = SCMFetcher(**fetcher_info)
 
            fetcher.setup()
 
    except VCSTestError, err:
 
        raise RuntimeError(str(err))
 

	
 
start_dir = os.path.abspath(os.path.dirname(__file__))
 
unittest.defaultTestLoader.discover(start_dir)
 
#start_dir = os.path.abspath(os.path.dirname(__file__))
 
#unittest.defaultTestLoader.discover(start_dir)
rhodecode/tests/vcs/conf.py
Show inline comments
 
"""
 
Unit tests configuration module for vcs.
 
"""
 

	
 
import os
 
import time
 
import hashlib
 
import tempfile
 
import datetime
 

	
 
from rhodecode.tests import *
 
from utils import get_normalized_path
 
from os.path import join as jn
 

	
 
__all__ = (
 
    'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
 
    'SCM_TESTS',
 
)
 

	
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
TEST_TMP_PATH = TESTS_TMP_PATH
 
#__all__ = (
 
#    'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
 
#    'SCM_TESTS',
 
#)
 
#
 
#SCM_TESTS = ['hg', 'git']
 
#uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 
#
 
THIS = os.path.abspath(os.path.dirname(__file__))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
 
TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-git'))
 
TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
 
                            jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
 
                            jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-hg'))
 
TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
 
                              jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
 
TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
 
                              jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
 

	
 
TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 
#
 
#GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 
#
 
#TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
 
#TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
 
#                              jn(TEST_TMP_PATH, 'vcs-git'))
 
#TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
 
#                            jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
#TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
 
#                            jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 
#
 
#HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
#TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
 
#                              jn(TEST_TMP_PATH, 'vcs-hg'))
 
#TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
 
#                              jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
 
#TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
 
#                              jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
 
#
 
#TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
#TEST_REPO_PREFIX = 'vcs-test'
 
#
 
#
 
#def get_new_dir(title):
 
#    """
 
#    Returns always new directory path.
 
#    """
 
#    name = TEST_REPO_PREFIX
 
#    if title:
 
#        name = '-'.join((name, title))
 
#    hex = hashlib.sha1(str(time.time())).hexdigest()
 
#    name = '-'.join((name, hex))
 
#    path = os.path.join(TEST_DIR, name)
 
#    return get_normalized_path(path)
 

	
 
PACKAGE_DIR = os.path.abspath(os.path.join(
 
    os.path.dirname(__file__), '..'))
 

	
 
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
rhodecode/tests/vcs/test_hg.py
Show inline comments
 
@@ -135,56 +135,52 @@ class MercurialRepositoryTest(unittest.T
 
                'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
 
                '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
 
                'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
 
        self.assertEqual(org, self.repo.revisions[:31])
 

	
 
    def test_iter_slice(self):
 
        sliced = list(self.repo[:10])
 
        itered = list(self.repo)[:10]
 
        self.assertEqual(sliced, itered)
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 

	
 
        #active branches
 
        self.assertTrue('default' in self.repo.branches)
 

	
 
        #closed branches
 
        self.assertFalse('web' in self.repo.branches)
 
        self.assertFalse('git' in self.repo.branches)
 
        self.assertTrue('git' in self.repo.branches)
 

	
 
        # closed
 
        self.assertTrue('workdir' in self.repo._get_branches(closed=True))
 
        self.assertTrue('webvcs' in self.repo._get_branches(closed=True))
 
        self.assertTrue('web' in self.repo._get_branches(closed=True))
 

	
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), MercurialChangeset))
 

	
 
    def test_tip_in_tags(self):
 
        # tip is always a tag
 
        self.assertIn('tip', self.repo.tags)
 

	
 
    def test_tip_changeset_in_tags(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(self.repo.tags['tip'], tip.raw_id)
 

	
 
    def test_initial_changeset(self):
 

	
 
        init_chset = self.repo.get_changeset(0)
 
        self.assertEqual(init_chset.message, 'initial import')
 
        self.assertEqual(init_chset.author,
 
            'Marcin Kuzminski <marcin@python-blog.com>')
 
        self.assertEqual(sorted(init_chset._file_paths),
 
            sorted([
 
                'vcs/__init__.py',
 
                'vcs/backends/BaseRepository.py',
 
                'vcs/backends/__init__.py',

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)