Changeset - 9492ab68331f
[Not reviewed]
rhodecode/controllers/admin/repos.py
Show inline comments
 
@@ -25,142 +25,145 @@
 

	
 
import logging
 
import traceback
 
import formencode
 
from formencode import htmlfill
 

	
 
from paste.httpexceptions import HTTPInternalServerError
 
from pylons import request, session, tmpl_context as c, url
 
from pylons.controllers.util import redirect
 
from pylons.i18n.translation import _
 
from sqlalchemy.exc import IntegrityError
 

	
 
from rhodecode.lib import helpers as h
 
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator, \
 
    HasPermissionAnyDecorator, HasRepoPermissionAllDecorator
 
from rhodecode.lib.base import BaseController, render
 
from rhodecode.lib.utils import invalidate_cache, action_logger, repo_name_slug
 
from rhodecode.lib.helpers import get_token
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User, Repository, UserFollowing, RepoGroup
 
from rhodecode.model.forms import RepoForm
 
from rhodecode.model.scm import ScmModel
 
from rhodecode.model.repo import RepoModel
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposController(BaseController):
 
    """
 
    REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('repo', 'repos')
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def __before__(self):
 
        c.admin_user = session.get('admin_user')
 
        c.admin_username = session.get('admin_username')
 
        super(ReposController, self).__before__()
 

	
 
    def __load_defaults(self):
 
        c.repo_groups = RepoGroup.groups_choices()
 
        c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
 

	
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.users_groups_array = repo_model.get_users_groups_js()
 
        c.landing_revs = ScmModel().get_repo_landing_revs()
 

	
 
    def __load_data(self, repo_name=None):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param repo_name:
 
        """
 
        self.__load_defaults()
 

	
 
        c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
 
        repo = db_repo.scm_instance
 

	
 
        if c.repo_info is None:
 
            h.flash(_('%s repository is not mapped to db perhaps'
 
                      ' it was created or renamed from the filesystem'
 
                      ' please run the application again'
 
                      ' in order to rescan repositories') % repo_name,
 
                      category='error')
 

	
 
            return redirect(url('repos'))
 

	
 
        c.landing_revs = ScmModel().get_repo_landing_revs(c.repo_info)
 
        c.default_user_id = User.get_by_username('default').user_id
 
        c.in_public_journal = UserFollowing.query()\
 
            .filter(UserFollowing.user_id == c.default_user_id)\
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        if c.repo_info.stats:
 
            # this is on what revision we ended up so we add +1 for count
 
            last_rev = c.repo_info.stats.stat_on_revision + 1
 
        else:
 
            last_rev = 0
 
        c.stats_revision = last_rev
 

	
 
        c.repo_last_rev = repo.count() if repo.revisions else 0
 

	
 
        if last_rev == 0 or c.repo_last_rev == 0:
 
            c.stats_percentage = 0
 
        else:
 
            c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                            c.repo_last_rev) * 100)
 

	
 
        defaults = RepoModel()._get_defaults(repo_name)
 

	
 
        c.repos_list = [('', _('--REMOVE FORK--'))]
 
        c.repos_list += [(x.repo_id, x.repo_name) for x in
 
                   Repository.query().order_by(Repository.repo_name).all()]
 

	
 
        return defaults
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def index(self, format='html'):
 
        """GET /repos: All items in the collection"""
 
        # url('repos')
 

	
 
        c.repos_list = ScmModel().get_repos(Repository.query()
 
                                            .order_by(Repository.repo_name)
 
                                            .all(), sort_key='name_sort')
 
        return render('admin/repos/repos.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create(self):
 
        """
 
        POST /repos: Create a new item"""
 
        # url('repos')
 

	
 
        self.__load_defaults()
 
        form_result = {}
 
        try:
 
            form_result = RepoForm(repo_groups=c.repo_groups_choices)()\
 
                            .to_python(dict(request.POST))
 
            RepoModel().create(form_result, self.rhodecode_user)
 
            if form_result['clone_uri']:
 
                h.flash(_('created repository %s from %s') \
 
                    % (form_result['repo_name'], form_result['clone_uri']),
 
                    category='success')
 
            else:
 
                h.flash(_('created repository %s') % form_result['repo_name'],
 
                    category='success')
 

	
 
            if request.POST.get('user_created'):
 
                # created by regular non admin user
 
                action_logger(self.rhodecode_user, 'user_created_repo',
 
                              form_result['repo_name_full'], self.ip_addr,
 
                              self.sa)
 
            else:
 
                action_logger(self.rhodecode_user, 'admin_created_repo',
 
                              form_result['repo_name_full'], self.ip_addr,
 
                              self.sa)
 
            Session.commit()
 
        except formencode.Invalid, errors:
 

	
 
            c.new_repo = errors.value['repo_name']
 

	
 
            if request.POST.get('user_created'):
 
                r = render('admin/repos/repo_add_create_repository.html')
rhodecode/controllers/admin/settings.py
Show inline comments
 
@@ -352,75 +352,76 @@ class SettingsController(BaseController)
 
        """PUT /_admin/my_account_update: Update an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="PUT" />
 
        # Or using helpers:
 
        #    h.form(url('admin_settings_my_account_update'),
 
        #           method='put')
 
        # url('admin_settings_my_account_update', id=ID)
 
        user_model = UserModel()
 
        uid = self.rhodecode_user.user_id
 
        _form = UserForm(edit=True,
 
                         old_data={'user_id': uid,
 
                                   'email': self.rhodecode_user.email})()
 
        form_result = {}
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            user_model.update_my_account(uid, form_result)
 
            h.flash(_('Your account was updated successfully'),
 
                    category='success')
 
            Session.commit()
 
        except formencode.Invalid, errors:
 
            c.user = User.get(self.rhodecode_user.user_id)
 
            all_repos = self.sa.query(Repository)\
 
                .filter(Repository.user_id == c.user.user_id)\
 
                .order_by(func.lower(Repository.repo_name))\
 
                .all()
 
            c.user_repos = ScmModel().get_repos(all_repos)
 

	
 
            c.form = htmlfill.render(
 
                render('admin/users/user_edit_my_account_form.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8")
 
            return render('admin/users/user_edit_my_account.html')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occurred during update of user %s') \
 
                    % form_result.get('username'), category='error')
 

	
 
        return redirect(url('my_account'))
 

	
 
    @NotAnonymous()
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create_repository(self):
 
        """GET /_admin/create_repository: Form to create a new item"""
 

	
 
        c.repo_groups = RepoGroup.groups_choices()
 
        c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
 
        c.landing_revs = ScmModel().get_repo_landing_revs()
 

	
 
        new_repo = request.GET.get('repo', '')
 
        c.new_repo = repo_name_slug(new_repo)
 

	
 
        return render('admin/repos/repo_add_create_repository.html')
 

	
 
    def get_hg_ui_settings(self):
 
        ret = self.sa.query(RhodeCodeUi).all()
 

	
 
        if not ret:
 
            raise Exception('Could not get application ui settings !')
 
        settings = {}
 
        for each in ret:
 
            k = each.ui_key
 
            v = each.ui_value
 
            if k == '/':
 
                k = 'root_path'
 

	
 
            if k.find('.') != -1:
 
                k = k.replace('.', '_')
 

	
 
            if each.ui_section == 'hooks':
 
                v = each.ui_active
 

	
 
            settings[each.ui_section + '_' + k] = v
 

	
 
        return settings
rhodecode/lib/utils.py
Show inline comments
 
@@ -403,97 +403,98 @@ def map_groups(path):
 
        group = RepoGroup.get_by_group_name(group_name)
 
        desc = '%s group' % group_name
 

	
 
        # skip folders that are now removed repos
 
        if REMOVED_REPO_PAT.match(group_name):
 
            break
 

	
 
        if group is None:
 
            log.debug('creating group level: %s group_name: %s' % (lvl,
 
                                                                   group_name))
 
            group = RepoGroup(group_name, parent)
 
            group.group_description = desc
 
            sa.add(group)
 
            rgm._create_default_perms(group)
 
            sa.flush()
 
        parent = group
 
    return group
 

	
 

	
 
def repo2db_mapper(initial_repo_list, remove_obsolete=False):
 
    """
 
    maps all repos given in initial_repo_list, non existing repositories
 
    are created, if remove_obsolete is True it also check for db entries
 
    that are not in initial_repo_list and removes them.
 

	
 
    :param initial_repo_list: list of repositories found by scanning methods
 
    :param remove_obsolete: check for obsolete entries in database
 
    """
 
    from rhodecode.model.repo import RepoModel
 
    sa = meta.Session
 
    rm = RepoModel()
 
    user = sa.query(User).filter(User.admin == True).first()
 
    if user is None:
 
        raise Exception('Missing administrative account !')
 
    added = []
 

	
 
    for name, repo in initial_repo_list.items():
 
        group = map_groups(name)
 
        if not rm.get_by_repo_name(name, cache=False):
 
            log.info('repository %s not found creating default' % name)
 
            added.append(name)
 
            form_data = {
 
             'repo_name': name,
 
             'repo_name_full': name,
 
             'repo_type': repo.alias,
 
             'description': repo.description \
 
                if repo.description != 'unknown' else '%s repository' % name,
 
             'private': False,
 
             'group_id': getattr(group, 'group_id', None)
 
             'group_id': getattr(group, 'group_id', None),
 
             'landing_rev': repo.DEFAULT_BRANCH_NAME
 
            }
 
            rm.create(form_data, user, just_db=True)
 
    sa.commit()
 
    removed = []
 
    if remove_obsolete:
 
        # remove from database those repositories that are not in the filesystem
 
        for repo in sa.query(Repository).all():
 
            if repo.repo_name not in initial_repo_list.keys():
 
                log.debug("Removing non existing repository found in db %s" %
 
                          repo.repo_name)
 
                removed.append(repo.repo_name)
 
                sa.delete(repo)
 
                sa.commit()
 

	
 
    # clear cache keys
 
    log.debug("Clearing cache keys now...")
 
    CacheInvalidation.clear_cache()
 
    sa.commit()
 
    return added, removed
 

	
 

	
 
# set cache regions for beaker so celery can utilise it
 
def add_cache(settings):
 
    cache_settings = {'regions': None}
 
    for key in settings.keys():
 
        for prefix in ['beaker.cache.', 'cache.']:
 
            if key.startswith(prefix):
 
                name = key.split(prefix)[1].strip()
 
                cache_settings[name] = settings[key].strip()
 
    if cache_settings['regions']:
 
        for region in cache_settings['regions'].split(','):
 
            region = region.strip()
 
            region_settings = {}
 
            for key, value in cache_settings.items():
 
                if key.startswith(region):
 
                    region_settings[key.split('.')[1]] = value
 
            region_settings['expire'] = int(region_settings.get('expire',
 
                                                                60))
 
            region_settings.setdefault('lock_dir',
 
                                       cache_settings.get('lock_dir'))
 
            region_settings.setdefault('data_dir',
 
                                       cache_settings.get('data_dir'))
 

	
 
            if 'type' not in region_settings:
 
                region_settings['type'] = cache_settings.get('type',
 
                                                             'memory')
 
            beaker.cache.cache_regions[region] = region_settings
 

	
 
@@ -513,137 +514,142 @@ def load_rcextensions(root_path):
 

	
 
        #OVERRIDE OUR EXTENSIONS FROM RC-EXTENSIONS (if present)
 

	
 
        if getattr(EXT, 'INDEX_EXTENSIONS', []) != []:
 
            log.debug('settings custom INDEX_EXTENSIONS')
 
            conf.INDEX_EXTENSIONS = getattr(EXT, 'INDEX_EXTENSIONS', [])
 

	
 
        #ADDITIONAL MAPPINGS
 
        log.debug('adding extra into INDEX_EXTENSIONS')
 
        conf.INDEX_EXTENSIONS.extend(getattr(EXT, 'EXTRA_INDEX_EXTENSIONS', []))
 

	
 

	
 
#==============================================================================
 
# TEST FUNCTIONS AND CREATORS
 
#==============================================================================
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 

	
 
    :param config: test config
 
    :param full_index:
 
    """
 

	
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    from rhodecode.lib.pidlock import DaemonLock, LockHeld
 

	
 
    repo_location = repo_location
 

	
 
    index_location = os.path.join(config['app_conf']['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=jn(dn(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 
    from rhodecode.lib.db_manage import DbManage
 
    from rhodecode.tests import HG_REPO, TESTS_TMP_PATH
 
    from rhodecode.tests import HG_REPO, GIT_REPO, TESTS_TMP_PATH
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.db1.url']
 
    log.debug('making test db %s' % dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s' % repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session.commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['app_conf']['index_dir']
 
    data_path = config['app_conf']['cache_dir']
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s' % idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s' % data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT HG REPOSITORY
 
    #CREATE DEFAULT TEST REPOS
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from rhodecode.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
#==============================================================================
 
# PASTER COMMANDS
 
#==============================================================================
 
class BasePasterCommand(Command):
 
    """
 
    Abstract Base Class for paster commands.
 

	
 
    The celery commands are somewhat aggressive about loading
 
    celery.conf, and since our module sets the `CELERY_LOADER`
 
    environment variable to our loader, we have to bootstrap a bit and
 
    make sure we've had a chance to load the pylons config off of the
 
    command line, otherwise everything fails.
 
    """
 
    min_args = 1
 
    min_args_error = "Please provide a paster config file as an argument."
 
    takes_config_file = 1
 
    requires_config_file = True
 

	
 
    def notify_msg(self, msg, log=False):
 
        """Make a notification to user, additionally if logger is passed
 
        it logs this action using given logger
 

	
 
        :param msg: message that will be printed to user
 
        :param log: logging instance, to use to additionally log this message
 

	
 
        """
 
        if log and isinstance(log, logging):
 
            log(msg)
 

	
 
    def run(self, args):
 
        """
 
        Overrides Command.run
 

	
 
        Checks for a config file argument and loads it.
 
        """
 
        if len(args) < self.min_args:
 
            raise BadCommand(
 
                self.min_args_error % {'min_args': self.min_args,
 
                                       'actual_args': len(args)})
 

	
 
        # Decrement because we're going to lob off the first argument.
 
        # @@ This is hacky
 
        self.min_args -= 1
rhodecode/model/db.py
Show inline comments
 
@@ -250,97 +250,97 @@ class RhodeCodeUi(Base, BaseModel):
 
    @classmethod
 
    def get_by_key(cls, key):
 
        return cls.query().filter(cls.ui_key == key)
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE,
 
                                    cls.HOOK_REPO_SIZE,
 
                                    cls.HOOK_PUSH, cls.HOOK_PULL]))
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE,
 
                                    cls.HOOK_REPO_SIZE,
 
                                    cls.HOOK_PUSH, cls.HOOK_PULL]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('/').one().ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_by_key(key).scalar() or cls()
 
        new_ui.ui_section = 'hooks'
 
        new_ui.ui_active = True
 
        new_ui.ui_key = key
 
        new_ui.ui_value = val
 

	
 
        Session.add(new_ui)
 

	
 

	
 
class User(Base, BaseModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        UniqueConstraint('username'), UniqueConstraint('email'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    active = Column("active", Boolean(), nullable=True, unique=None, default=None)
 
    admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
 
    name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    name = Column("firstname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    _email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
 
    ldap_dn = Column("ldap_dn", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    api_key = Column("api_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 

	
 
    user_log = relationship('UserLog', cascade='all')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UsersGroupMember', cascade='all')
 

	
 
    notifications = relationship('UserNotification', cascade='all')
 
    # notifications assigned to this user
 
    user_created_notifications = relationship('Notification', cascade='all')
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.name, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        return ('%s %s' % (self.name, self.lastname)
 
                if (self.name and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.name, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.name, self.lastname)
 

	
 
    @property
 
@@ -455,103 +455,104 @@ class UsersGroup(Base, BaseModel):
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(group_name)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, users_group_id, cache=False):
 
        users_group = cls.query()
 
        if cache:
 
            users_group = users_group.options(FromCache("sql_cache_short",
 
                                    "get_users_group_%s" % users_group_id))
 
        return users_group.get(users_group_id)
 

	
 

	
 
class UsersGroupMember(Base, BaseModel):
 
    __tablename__ = 'users_groups_members'
 
    __table_args__ = (
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 

	
 
    users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
 

	
 
    user = relationship('User', lazy='joined')
 
    users_group = relationship('UsersGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class Repository(Base, BaseModel):
 
    __tablename__ = 'repositories'
 
    __table_args__ = (
 
        UniqueConstraint('repo_name'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 

	
 
    repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    repo_name = Column("repo_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
 
    clone_uri = Column("clone_uri", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
 
    repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
 
    repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
 
    private = Column("private", Boolean(), nullable=True, unique=None, default=None)
 
    enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
 
    enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
 
    description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
 
    landing_rev = Column("landing_revision", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
 

	
 
    fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
 
    group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
 

	
 
    user = relationship('User')
 
    fork = relationship('Repository', remote_side=repo_id)
 
    group = relationship('RepoGroup')
 
    repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
 
    users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
 
    stats = relationship('Statistics', cascade='all', uselist=False)
 

	
 
    followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
 

	
 
    logs = relationship('UserLog')
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__,self.repo_id,
 
                                   self.repo_name)
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name):
 
        q = Session.query(cls).filter(cls.repo_name == repo_name)
 
        q = q.options(joinedload(Repository.fork))\
 
                .options(joinedload(Repository.user))\
 
                .options(joinedload(Repository.group))
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_full_path(cls, repo_full_path):
 
        repo_name = repo_full_path.split(cls.base_path(), 1)[-1]
 
        return cls.get_by_repo_name(repo_name.strip(URL_SEP))
 

	
 
    @classmethod
 
    def get_repo_forks(cls, repo_id):
 
        return cls.query().filter(Repository.fork_id == repo_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path when all repos are stored
 

	
 
        :param cls:
 
        """
 
        q = Session.query(RhodeCodeUi)\
rhodecode/model/forms.py
Show inline comments
 
@@ -617,141 +617,142 @@ def ReposGroupForm(edit=False, old_data=
 
                               SlugifyName())
 
        group_description = UnicodeString(strip=True, min=1,
 
                                                not_empty=True)
 
        group_parent_id = OneOf(available_groups, hideList=False,
 
                                        testValueList=True,
 
                                        if_missing=None, not_empty=False)
 

	
 
        chained_validators = [ValidReposGroup(edit, old_data), ValidPerms('group')]
 

	
 
    return _ReposGroupForm
 

	
 

	
 
def RegisterForm(edit=False, old_data={}):
 
    class _RegisterForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(ValidUsername(edit, old_data),
 
                       UnicodeString(strip=True, min=1, not_empty=True))
 
        password = All(UnicodeString(strip=False, min=6, not_empty=True))
 
        password_confirmation = All(UnicodeString(strip=False, min=6, not_empty=True))
 
        active = StringBoolean(if_missing=False)
 
        name = UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(Email(not_empty=True), UniqSystemEmail(old_data))
 

	
 
        chained_validators = [ValidPasswordsMatch, ValidPassword]
 

	
 
    return _RegisterForm
 

	
 

	
 
def PasswordResetForm():
 
    class _PasswordResetForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = All(ValidSystemEmail(), Email(not_empty=True))
 
    return _PasswordResetForm
 

	
 

	
 
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
             repo_groups=[]):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        clone_uri = All(UnicodeString(strip=True, min=1, not_empty=False))
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        repo_type = OneOf(supported_backends)
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        description = UnicodeString(strip=True, min=1, not_empty=False)
 
        private = StringBoolean(if_missing=False)
 
        enable_statistics = StringBoolean(if_missing=False)
 
        enable_downloads = StringBoolean(if_missing=False)
 
        landing_rev = UnicodeString(strip=True, min=1, not_empty=True)
 

	
 
        if edit:
 
            #this is repo owner
 
            user = All(UnicodeString(not_empty=True), ValidRepoUser)
 

	
 
        chained_validators = [ValidCloneUri()(),
 
                              ValidRepoName(edit, old_data),
 
                              ValidPerms()]
 
    return _RepoForm
 

	
 

	
 
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                 repo_groups=[]):
 
def RepoForkForm(edit=False, old_data={},
 
                 supported_backends=BACKENDS.keys(), repo_groups=[]):
 
    class _RepoForkForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        repo_type = All(ValidForkType(old_data), OneOf(supported_backends))
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        private = StringBoolean(if_missing=False)
 
        copy_permissions = StringBoolean(if_missing=False)
 
        update_after_clone = StringBoolean(if_missing=False)
 
        fork_parent_id = UnicodeString()
 
        chained_validators = [ValidForkName(edit, old_data)]
 

	
 
    return _RepoForkForm
 

	
 

	
 
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                     repo_groups=[]):
 
def RepoSettingsForm(edit=False, old_data={},
 
                     supported_backends=BACKENDS.keys(), repo_groups=[]):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        private = StringBoolean(if_missing=False)
 

	
 
        landing_rev = UnicodeString(strip=True, min=1, not_empty=True)
 
        chained_validators = [ValidRepoName(edit, old_data), ValidPerms(),
 
                              ValidSettings]
 
    return _RepoForm
 

	
 

	
 
def ApplicationSettingsForm():
 
    class _ApplicationSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_ga_code = UnicodeString(strip=True, min=1, not_empty=False)
 

	
 
    return _ApplicationSettingsForm
 

	
 

	
 
def ApplicationUiSettingsForm():
 
    class _ApplicationUiSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        web_push_ssl = OneOf(['true', 'false'], if_missing='false')
 
        paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True))
 
        hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False)
 
        hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False)
 
        hooks_changegroup_push_logger = OneOf(['True', 'False'], if_missing=False)
 
        hooks_preoutgoing_pull_logger = OneOf(['True', 'False'], if_missing=False)
 

	
 
    return _ApplicationUiSettingsForm
 

	
 

	
 
def DefaultPermissionsForm(perms_choices, register_choices, create_choices):
 
    class _DefaultPermissionsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        overwrite_default = StringBoolean(if_missing=False)
 
        anonymous = OneOf(['True', 'False'], if_missing=False)
 
        default_perm = OneOf(perms_choices)
 
        default_register = OneOf(register_choices)
 
        default_create = OneOf(create_choices)
 

	
 
    return _DefaultPermissionsForm
 

	
 

	
 
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices, tls_kind_choices):
 
    class _LdapSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        #pre_validators = [LdapLibValidator]
rhodecode/model/scm.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.model.scm
 
    ~~~~~~~~~~~~~~~~~~~
 

	
 
    Scm model for RhodeCode
 

	
 
    :created_on: Apr 9, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
import os
 
import time
 
import traceback
 
import logging
 
import cStringIO
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
from rhodecode.lib.vcs import get_backend
 
from rhodecode.lib.vcs.exceptions import RepositoryError
 
from rhodecode.lib.vcs.utils.lazy import LazyProperty
 
from rhodecode.lib.vcs.nodes import FileNode
 

	
 
from rhodecode import BACKENDS
 
from rhodecode.lib import helpers as h
 
from rhodecode.lib.utils2 import safe_str, safe_unicode
 
from rhodecode.lib.auth import HasRepoPermissionAny, HasReposGroupPermissionAny
 
from rhodecode.lib.utils import get_repos as get_filesystem_repos, make_ui, \
 
    action_logger, EmptyChangeset, REMOVED_REPO_PAT
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import Repository, RhodeCodeUi, CacheInvalidation, \
 
    UserFollowing, UserLog, User, RepoGroup
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
 

	
 

	
 
class RepoTemp(object):
 
    def __init__(self, repo_id):
 
        self.repo_id = repo_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
 

	
 

	
 
class CachedRepoList(object):
 

	
 
    def __init__(self, db_repo_list, repos_path, order_by=None):
 
        self.db_repo_list = db_repo_list
 
        self.repos_path = repos_path
 
        self.order_by = order_by
 
        self.reversed = (order_by or '').startswith('-')
 

	
 
    def __len__(self):
 
        return len(self.db_repo_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 
@@ -427,48 +428,78 @@ class ScmModel(BaseModel):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [cs]
 

	
 
        m.add(FileNode(path, content=content))
 
        tip = m.commit(message=message,
 
                       author=author,
 
                       parents=parents, branch=cs.branch)
 
        new_cs = tip.short_id
 
        action = 'push_local:%s' % new_cs
 

	
 
        action_logger(user, action, repo_name)
 

	
 
        self.mark_for_invalidation(repo_name)
 

	
 
    def get_nodes(self, repo_name, revision, root_path='/', flat=True):
 
        """
 
        recursive walk in root dir and return a set of all path in that dir
 
        based on repository walk function
 

	
 
        :param repo_name: name of repository
 
        :param revision: revision for which to list nodes
 
        :param root_path: root path to list
 
        :param flat: return as a list, if False returns a dict with decription
 

	
 
        """
 
        _files = list()
 
        _dirs = list()
 
        try:
 
            _repo = self.__get_repo(repo_name)
 
            changeset = _repo.scm_instance.get_changeset(revision)
 
            root_path = root_path.lstrip('/')
 
            for topnode, dirs, files in changeset.walk(root_path):
 
                for f in files:
 
                    _files.append(f.path if flat else {"name": f.path,
 
                                                       "type": "file"})
 
                for d in dirs:
 
                    _dirs.append(d.path if flat else {"name": d.path,
 
                                                      "type": "dir"})
 
        except RepositoryError:
 
            log.debug(traceback.format_exc())
 
            raise
 

	
 
        return _dirs, _files
 

	
 
    def get_unread_journal(self):
 
        return self.sa.query(UserLog).count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        :type repo:
 
        """
 
        hist_l = []
 
        repo = self.__get_repo(repo)
 
        hist_l.append(['tip', _('latest tip')])
 
        if not repo:
 
            return hist_l
 

	
 
        repo = repo.scm_instance
 
        branches_group = ([(k, k) for k, v in
 
                           repo.branches.iteritems()], _("Branches"))
 
        hist_l.append(branches_group)
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([(k, k) for k, v in
 
                                repo.bookmarks.iteritems()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 

	
 
        tags_group = ([(k, k) for k, v in
 
                       repo.tags.iteritems()], _("Tags"))
 
        hist_l.append(tags_group)
 

	
 
        return hist_l
rhodecode/templates/admin/repos/repo_add_base.html
Show inline comments
 
## -*- coding: utf-8 -*-
 

	
 
${h.form(url('repos'))}
 
<div class="form">
 
    <!-- fields -->
 
    <div class="fields">
 
        <div class="field">
 
            <div class="label">
 
                <label for="repo_name">${_('Name')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.text('repo_name',c.new_repo,class_="small")}
 
                %if not h.HasPermissionAll('hg.admin')('repo create form'):
 
                    ${h.hidden('user_created',True)}
 
                %endif
 
            </div>
 
         </div>
 
        <div class="field">
 
            <div class="label">
 
                <label for="clone_uri">${_('Clone from')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.text('clone_uri',class_="small")}
 
                <span class="help-block">${_('Optional http[s] url from which repository should be cloned.')}</span>
 
            </div>
 
         </div>
 
         <div class="field">
 
             <div class="label">
 
                 <label for="repo_group">${_('Repository group')}:</label>
 
             </div>
 
             <div class="input">
 
                 ${h.select('repo_group',request.GET.get('parent_group'),c.repo_groups,class_="medium")}
 
                 <span class="help-block">${_('Optional select a group to put this repository into.')}</span>
 
             </div>
 
         </div>
 
        <div class="field">
 
            <div class="label">
 
                <label for="repo_type">${_('Type')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.select('repo_type','hg',c.backends,class_="small")}
 
                <span class="help-block">${_('Type of repository to create.')}</span>
 
            </div>
 
         </div>
 
        <div class="field">
 
            <div class="label">
 
                <label for="landing_rev">${_('Landing revision')}:</label>
 
            </div>
 
            <div class="input">
 
                ${h.select('landing_rev','',c.landing_revs,class_="medium")}
 
                <span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
 
            </div>
 
        </div>           
 
        <div class="field">
 
            <div class="label label-textarea">
 
                <label for="description">${_('Description')}:</label>
 
            </div>
 
            <div class="textarea text-area editor">
 
                ${h.textarea('description')}
 
                <span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
 
            </div>
 
         </div>
 
        <div class="field">
 
            <div class="label label-checkbox">
 
                <label for="private">${_('Private repository')}:</label>
 
            </div>
 
            <div class="checkboxes">
 
                ${h.checkbox('private',value="True")}
 
                <span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
 
            </div>
 
         </div>
 
        <div class="buttons">
 
          ${h.submit('add',_('add'),class_="ui-button")}
 
        </div>
 
    </div>
 
</div>
 
${h.end_form()}
rhodecode/templates/admin/repos/repo_edit.html
Show inline comments
 
@@ -17,96 +17,105 @@
 
	${self.menu('admin')}
 
</%def>
 

	
 
<%def name="main()">
 
<div class="box box-left">
 
    <!-- box / title -->
 
    <div class="title">
 
        ${self.breadcrumbs()}
 
    </div>
 
    ${h.form(url('repo', repo_name=c.repo_info.repo_name),method='put')}
 
    <div class="form">
 
        <!-- fields -->
 
        <div class="fields">
 
            <div class="field">
 
                <div class="label">
 
                    <label for="repo_name">${_('Name')}:</label>
 
                </div>
 
                <div class="input">
 
                    ${h.text('repo_name',class_="medium")}
 
                </div>
 
           </div>
 
	       <div class="field">
 
	           <div class="label">
 
	               <label for="clone_uri">${_('Clone uri')}:</label>
 
	           </div>
 
	           <div class="input">
 
	               ${h.text('clone_uri',class_="medium")}
 
                 <span class="help-block">${_('Optional http[s] url from which repository should be cloned.')}</span>
 
	           </div>
 
	        </div>
 
	        <div class="field">
 
	            <div class="label">
 
	                <label for="repo_group">${_('Repository group')}:</label>
 
	            </div>
 
	            <div class="input">
 
	                ${h.select('repo_group','',c.repo_groups,class_="medium")}
 
                    <span class="help-block">${_('Optional select a group to put this repository into.')}</span>
 
	            </div>
 
	        </div>
 
            <div class="field">
 
                <div class="label">
 
                    <label for="repo_type">${_('Type')}:</label>
 
                </div>
 
                <div class="input">
 
                    ${h.select('repo_type','hg',c.backends,class_="medium")}
 
                </div>
 
            </div>
 
            <div class="field">
 
                <div class="label">
 
                    <label for="landing_rev">${_('Landing revision')}:</label>
 
                </div>
 
                <div class="input">
 
                    ${h.select('landing_rev','',c.landing_revs,class_="medium")}
 
                    <span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
 
                </div>
 
            </div>            
 
            <div class="field">
 
                <div class="label label-textarea">
 
                    <label for="description">${_('Description')}:</label>
 
                </div>
 
                <div class="textarea text-area editor">
 
                    ${h.textarea('description')}
 
                    <span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
 
                </div>
 
            </div>
 

	
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="private">${_('Private repository')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('private',value="True")}
 
                    <span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
 
                </div>
 
            </div>
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="enable_statistics">${_('Enable statistics')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('enable_statistics',value="True")}
 
                    <span class="help-block">${_('Enable statistics window on summary page.')}</span>
 
                </div>
 
            </div>
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="enable_downloads">${_('Enable downloads')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('enable_downloads',value="True")}
 
                    <span class="help-block">${_('Enable download menu on summary page.')}</span>
 
                </div>
 
            </div>
 
            <div class="field">
 
                <div class="label">
 
                    <label for="user">${_('Owner')}:</label>
 
                </div>
 
                <div class="input input-medium ac">
 
                    <div class="perm_ac">
 
                       ${h.text('user',class_='yui-ac-input')}
 
                       <span class="help-block">${_('Change owner of this repository.')}</span>
 
                       <div id="owner_container"></div>
 
                    </div>
 
                </div>
 
             </div>
 
@@ -165,67 +174,67 @@
 
        ${h.end_form()}
 
        %endif
 

	
 
        <h3>${_('Cache')}</h3>
 
        ${h.form(url('repo_cache', repo_name=c.repo_info.repo_name),method='delete')}
 
        <div class="form">
 
           <div class="fields">
 
               ${h.submit('reset_cache_%s' % c.repo_info.repo_name,_('Invalidate repository cache'),class_="ui-btn",onclick="return confirm('"+_('Confirm to invalidate repository cache')+"');")}
 
           </div>
 
        </div>
 
        ${h.end_form()}
 

	
 
        <h3>${_('Public journal')}</h3>
 
        ${h.form(url('repo_public_journal', repo_name=c.repo_info.repo_name),method='put')}
 
        <div class="form">
 
                ${h.hidden('auth_token',str(h.get_token()))}
 
                <div class="field">
 
                %if c.in_public_journal:
 
                    ${h.submit('set_public_%s' % c.repo_info.repo_name,_('Remove from public journal'),class_="ui-btn")}
 
                %else:
 
		            ${h.submit('set_public_%s' % c.repo_info.repo_name,_('Add to public journal'),class_="ui-btn")}
 
		        %endif
 
                </div>
 
               <div class="field" style="border:none;color:#888">
 
               <ul>
 
                    <li>${_('''All actions made on this repository will be accessible to everyone in public journal''')}
 
                    </li>
 
               </ul>
 
               </div>
 
        </div>
 
        ${h.end_form()}
 

	
 
        <h3>${_('Delete')}</h3>
 
        ${h.form(url('repo', repo_name=c.repo_info.repo_name),method='delete')}
 
        <div class="form">
 
           <div class="fields">
 
               ${h.submit('remove_%s' % c.repo_info.repo_name,_('Remove this repository'),class_="ui-btn red",onclick="return confirm('"+_('Confirm to delete this repository')+"');")}
 
           </div>
 
           <div class="field" style="border:none;color:#888">
 
           <ul>
 
                <li>${_('''This repository will be renamed in a special way in order to be unaccesible for RhodeCode and VCS systems.
 
                         If you need fully delete it from filesystem please do it manually''')}
 
                </li>
 
           </ul>
 
           </div>
 
        </div>
 
        ${h.end_form()}
 

	
 
        <h3>${_('Set as fork')}</h3>
 
        <h3>${_('Set as fork of')}</h3>
 
        ${h.form(url('repo_as_fork', repo_name=c.repo_info.repo_name),method='put')}
 
        <div class="form">
 
           <div class="fields">
 
               ${h.select('id_fork_of','',c.repos_list,class_="medium")}
 
               ${h.submit('set_as_fork_%s' % c.repo_info.repo_name,_('set'),class_="ui-btn",)}
 
           </div>
 
               <div class="field" style="border:none;color:#888">
 
               <ul>
 
                    <li>${_('''Manually set this repository as a fork of another''')}</li>
 
                    <li>${_('''Manually set this repository as a fork of another from the list''')}</li>
 
               </ul>
 
               </div>
 
        </div>
 
        ${h.end_form()}
 

	
 
</div>
 

	
 

	
 
</%def>
rhodecode/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 

	
 
import pylons.test
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
 
    'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
 
    'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
 
    'environ', 'url', 'get_new_dir', 'TestController', 'TESTS_TMP_PATH',
 
    'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK',
 
    'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
 
    'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
 
    'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
 
    'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_GIT_REPO',
 
    'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
 
    'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
 
    'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
 
    'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
# nosetests --pdb --pdb-failures
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
THIS = os.path.abspath(os.path.dirname(__file__))
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, 'vcs-hg')
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
PACKAGE_DIR = os.path.abspath(os.path.join(
 
    os.path.dirname(__file__), '..'))
 

	
 
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session)
 
        self.assertTrue(msg in response.session['flash'][0][1])
rhodecode/tests/functional/test_admin_repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
from rhodecode.lib import vcs
 

	
 
from rhodecode.model.db import Repository
 
from rhodecode.tests import *
 

	
 

	
 
class TestAdminReposController(TestController):
 

	
 
    def __make_repo(self):
 
        pass
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_repos', format='xml'))
 

	
 
    def test_create_hg(self):
 
        self.log_user()
 
        repo_name = NEW_HG_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 
        self.checkSessionFlash(response, 'created repository %s' % (repo_name))
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
 
                                                    repo_name).one()
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        self.assertTrue(repo_name in response.body)
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo in filesystem')
 

	
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_hg_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
 
        repo_name_unicode = repo_name.decode('utf8')
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = description.decode('utf8')
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name_unicode))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
 
                                                repo_name_unicode).one()
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        self.assertTrue(repo_name in response.body)
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo in filesystem')
 

	
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_hg_in_group(self):
 
        #TODO: write test !
 
        pass
 

	
 
    def test_create_git(self):
 
        return
 
        self.log_user()
 
        repo_name = NEW_GIT_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'git',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 

	
 

	
 
        #test if we have a message for that repository
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the fork was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name == repo_name).one()
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            assert False , 'no repo in filesystem'
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_create_git_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
 
        repo_name_unicode = repo_name.decode('utf8')
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = description.decode('utf8')
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'git',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name_unicode))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_repo'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_repo', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('repo', repo_name=HG_REPO))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name=HG_REPO),
 
                                 params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
    def test_delete_hg(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 

	
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                                'clone_uri':'',
 
                                                'repo_group':'',
 
                                                'description':description,
 
                                                'private':private})
 
        self.assertTrue('flash' in response.session)
 

	
 
        #test if we have a message for that repository
 
        self.assertTrue('''created repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
 
                                                    repo_name).one()
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        self.assertTrue(repo_name in response.body)
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 

	
 
        self.assertTrue('''deleted repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.Session.query(Repository).filter(Repository.repo_name
 
                                                        == repo_name).scalar()
 
        deleted_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
                                  False)
 

	
 
    def test_delete_git(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name': repo_name,
 
                                                'repo_type': 'git',
 
                                                'clone_uri': '',
 
                                                'repo_group': '',
 
                                                'description': description,
 
                                                'private': private,
 
                                                'landing_rev': 'tip'})
 
        self.checkSessionFlash(response,
 
                               'created repository %s' % (repo_name))
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        response.mustcontain(repo_name)
 

	
 
        #test if repository was created on filesystem
 
        try:
 
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
 
        except:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 

	
 
        self.assertTrue('''deleted repository %s''' % (repo_name) in
 
                        response.session['flash'][0])
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
                                  False)
 

	
 
    def test_delete_repo_with_group(self):
 
        #TODO:
 
        pass
 

	
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name=HG_REPO),
 
                                 params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
    def test_show_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name=HG_REPO))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_repo', repo_name=HG_REPO,
 
                                    format='xml'))
 
    def test_show_git(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name=GIT_REPO))
 

	
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_repo', repo_name=HG_REPO,
 
                                    format='xml'))
rhodecode/tests/functional/test_forks.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.db import Repository
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.user import UserModel
 

	
 

	
 
class TestForksController(TestController):
 

	
 
    def setUp(self):
 
        self.username = u'forkuser'
 
        self.password = u'qweqwe'
 
        self.u1 = UserModel().create_or_update(
 
            username=self.username, password=self.password,
 
            email=u'fork_king@rhodecode.org', name=u'u1', lastname=u'u1'
 
        )
 
        self.Session.commit()
 

	
 
    def tearDown(self):
 
        self.Session.delete(self.u1)
 
        self.Session.commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        repo_name = HG_REPO
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        self.assertTrue("""There are no forks yet""" in response.body)
 

	
 
    def test_index_with_fork(self):
 
    def test_index_with_fork_hg(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = HG_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks',
 
                                     action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name':fork_name,
 
                                     'repo_group':'',
 
                                     'fork_parent_id':org_repo.repo_id,
 
                                     'repo_type':'hg',
 
                                     'description':description,
 
                                     'private':'False'})
 
                                     'private': 'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        self.assertTrue("""<a href="/%s/summary">"""
 
                         """vcs_test_hg_fork</a>""" % fork_name
 
                         in response.body)
 
        response.mustcontain(
 
            """<a href="/%s/summary">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        #remove this fork
 
        response = self.app.delete(url('repo', repo_name=fork_name))
 

	
 
    def test_index_with_fork_git(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = GIT_FORK
 
        description = 'fork of vcs test'
 
        repo_name = GIT_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks',
 
                                     action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name': fork_name,
 
                                     'repo_group': '',
 
                                     'fork_parent_id': org_repo.repo_id,
 
                                     'repo_type': 'git',
 
                                     'description': description,
 
                                     'private': 'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(
 
            """<a href="/%s/summary">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        #remove this fork
 
        response = self.app.delete(url('repo', repo_name=fork_name))
 

	
 
    def test_z_fork_create(self):
 
        self.log_user()
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = HG_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks', action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name':fork_name,
 
                                     'repo_group':'',
 
                                     'fork_parent_id':org_repo.repo_id,
 
                                     'repo_type':'hg',
 
                                     'description':description,
 
                                     'private':'False'})
 
                                     'private':'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        #test if we have a message that fork is ok
 
        self.assertTrue('forked %s repository as %s' \
 
                      % (repo_name, fork_name) in response.session['flash'][0])
 
        self.checkSessionFlash(response,
 
                'forked %s repository as %s' % (repo_name, fork_name))
 

	
 
        #test if the fork was created in the database
 
        fork_repo = self.Session.query(Repository)\
 
            .filter(Repository.repo_name == fork_name).one()
 

	
 
        self.assertEqual(fork_repo.repo_name, fork_name)
 
        self.assertEqual(fork_repo.fork.repo_name, repo_name)
 

	
 
        #test if fork is visible in the list ?
 
        response = response.follow()
 

	
 
        # check if fork is marked as fork
 
        # wait for cache to expire
 
        import time
 
        time.sleep(10)
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=fork_name))
 

	
 
        self.assertTrue('Fork of %s' % repo_name in response.body)
 

	
 
    def test_zz_fork_permission_page(self):
 
        usr = self.log_user(self.username, self.password)['user_id']
 
        repo_name = HG_REPO
 

	
 
        forks = self.Session.query(Repository)\
 
            .filter(Repository.fork_id != None)\
 
            .all()
 
        self.assertEqual(1, len(forks))
 

	
 
        # set read permissions for this
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr,
 
                                          perm='repository.read')
 
        self.Session.commit()
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain('<div style="padding:5px 3px 3px 42px;">fork of vcs test</div>')
 

	
 
    def test_zzz_fork_permission_page(self):
 
        usr = self.log_user(self.username, self.password)['user_id']
 
        repo_name = HG_REPO
 

	
 
        forks = self.Session.query(Repository)\
 
            .filter(Repository.fork_id != None)\
 
            .all()
 
        self.assertEqual(1, len(forks))
 

	
 
        # set none
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr, perm='repository.none')
 
        self.Session.commit()
 
        # fork shouldn't be there
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 
        response.mustcontain('There are no forks yet')
rhodecode/tests/functional/test_search.py
Show inline comments
 
import os
 
from rhodecode.tests import *
 
import os
 
from nose.plugins.skip import SkipTest
 

	
 

	
 
class TestSearchController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'))
 

	
 
        self.assertTrue('class="small" id="q" name="q" type="text"' in
 
                        response.body)
 
        # Test response...
 

	
 
    def test_empty_search(self):
 
        if os.path.isdir(self.index_location):
 
            raise SkipTest('skipped due to existing index')
 
        else:
 
            self.log_user()
 
            response = self.app.get(url(controller='search', action='index'),
 
                                    {'q': HG_REPO})
 
            self.assertTrue('There is no index to search in. '
 
                            'Please run whoosh indexer' in response.body)
 

	
 
    def test_normal_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'def repo'})
 
        response.mustcontain('10 results')
 
        response.mustcontain('Permission denied')
 
        response.mustcontain('39 results')
 

	
 
    def test_repo_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'repository:%s def test' % HG_REPO})
 

	
 
        response.mustcontain('4 results')
 
        response.mustcontain('Permission denied')
rhodecode/tests/functional/test_summary.py
Show inline comments
 
@@ -47,58 +47,58 @@ class TestSummaryController(TestControll
 
    def test_index_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=GIT_REPO))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<img style="margin-bottom:2px" class="icon" """
 
            """title="Git repository" alt="Git repository" """
 
            """src="/images/icons/giticon.png"/>"""
 
        )
 
        response.mustcontain(
 
            """<img style="margin-bottom:2px" class="icon" """
 
            """title="public repository" alt="public """
 
            """repository" src="/images/icons/lock_open.png"/>"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain("""<input style="width:80%%;margin-left:105px" type="text" id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"/>""" % GIT_REPO)
 
        response.mustcontain("""<input style="display:none;width:80%%;margin-left:105px" type="text" id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"/>""" % ID)
 

	
 
    def test_index_by_id_hg(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="Mercurial repository" alt="Mercurial """
 
                        """repository" src="/images/icons/hgicon.png"/>""")
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="public repository" alt="public """
 
                        """repository" src="/images/icons/lock_open.png"/>""")
 

	
 
    def test_index_by_id_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="Git repository" alt="Git """
 
                        """repository" src="/images/icons/hgicon.png"/>""")
 
                        """repository" src="/images/icons/giticon.png"/>""")
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="public repository" alt="public """
 
                        """repository" src="/images/icons/lock_open.png"/>""")
 

	
 
    def _enable_stats(self):
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_statistics = True
 
        self.Session.add(r)
 
        self.Session.commit()
rhodecode/tests/test_models.py
Show inline comments
 
@@ -91,97 +91,98 @@ class TestReposGroups(unittest.TestCase)
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('deteteme'))
 

	
 
        sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('test1', 'deteteme'))
 

	
 
    def test_rename_single_group(self):
 
        sg1 = _make_group('initial')
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after')
 
        self.assertTrue(self.__check_path('after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
 

	
 
    def test_update_group_parent(self):
 

	
 
        sg1 = _make_group('initial', parent_id=self.g1.group_id)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
 
        self.assertTrue(self.__check_path('test1', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        self.assertTrue(self.__check_path('test3', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'hello')
 
        self.assertTrue(self.__check_path('hello'))
 

	
 
        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = _make_group('g1')
 
        g2 = _make_group('g2')
 

	
 
        # create new repo
 
        form_data = dict(repo_name='john',
 
                         repo_name_full='john',
 
                         fork_name=None,
 
                         description=None,
 
                         repo_group=None,
 
                         private=False,
 
                         repo_type='hg',
 
                         clone_uri=None)
 
                         clone_uri=None,
 
                         landing_rev='tip')
 
        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        r = RepoModel().create(form_data, cur_user)
 

	
 
        self.assertEqual(r.repo_name, 'john')
 

	
 
        # put repo into group
 
        form_data = form_data
 
        form_data['repo_group'] = g1.group_id
 
        form_data['perms_new'] = []
 
        form_data['perms_updates'] = []
 
        RepoModel().update(r.repo_name, form_data)
 
        self.assertEqual(r.repo_name, 'g1/john')
 

	
 
        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        self.assertTrue(self.__check_path('g2', 'g1'))
 

	
 
        # test repo
 
        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
 

	
 
    def test_move_to_root(self):
 
        g1 = _make_group('t11')
 
        Session.commit()
 
        g2 = _make_group('t22', parent_id=g1.group_id)
 
        Session.commit()
 

	
 
        self.assertEqual(g2.full_path, 't11/t22')
 
        self.assertTrue(self.__check_path('t11', 't22'))
 

	
 
        g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
 
        Session.commit()
 

	
 
        self.assertEqual(g2.group_name, 'g22')
 
        # we moved out group from t1 to '' so it's full path should be 'g2'
 
        self.assertEqual(g2.full_path, 'g22')
 
        self.assertFalse(self.__check_path('t11', 't22'))
 
        self.assertTrue(self.__check_path('g22'))
 

	
 

	
 
class TestUser(unittest.TestCase):
 
    def __init__(self, methodName='runTest'):
 
        Session.remove()
 
        super(TestUser, self).__init__(methodName=methodName)
 

	
 
    def test_create_and_remove(self):
 
        usr = UserModel().create_or_update(username=u'test_user', password=u'qweqwe',
 
                                     email=u'u232@rhodecode.org',
 
                                     name=u'u1', lastname=u'u1')
 
        Session.commit()
 
@@ -563,114 +564,114 @@ class TestPermissions(unittest.TestCase)
 
        new_perm_h = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm_h)
 
        Session.commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 

	
 
        # grant perm for group this should NOT override permission from user
 
        # since it's lower than granted
 
        new_perm_l = 'repository.read'
 
        RepoModel().grant_users_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_l)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.write'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_repo_in_group_permissions(self):
 
        self.g1 = _make_group('group1', skip_if_exists=True)
 
        self.g2 = _make_group('group2', skip_if_exists=True)
 
        Session.commit()
 
        # both perms should be read !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        #Change perms to none for both groups
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.anon,
 
                                                perm='group.none')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.anon,
 
                                                perm='group.none')
 

	
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        # add repo to group
 
        form_data = {
 
            'repo_name':HG_REPO,
 
            'repo_name_full':RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
 
            'repo_type':'hg',
 
            'clone_uri':'',
 
            'repo_group':self.g1.group_id,
 
            'description':'desc',
 
            'private':False
 
            'private': False,
 
            'landing_rev': 'tip'
 
        }
 
        self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
 
        Session.commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        #grant permission for u2 !
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        Session.commit()
 
        self.assertNotEqual(self.u1, self.u2)
 
        #u1 and anon should have not change perms while u2 should !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        u2_auth = AuthUser(user_id=self.u2.user_id)
 
        self.assertEqual(u2_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
    def test_repo_group_user_as_user_group_member(self):
 
        # create Group1
 
        self.g1 = _make_group('group1', skip_if_exists=True)
 
        Session.commit()
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read'})
 

	
 
        # set default permission to none
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.anon,
 
                                                perm='group.none')
 
        # make group
rhodecode/tests/vcs/__init__.py
Show inline comments
 
@@ -7,50 +7,50 @@ run for each engine listed at ``conf.SCM
 

	
 
For each SCM we run tests for, we need some repository. We would use
 
repositories location from system environment variables or test suite defaults
 
- see ``conf`` module for more detail. We simply try to check if repository at
 
certain location exists, if not we would try to fetch them. At ``test_vcs`` or
 
``test_common`` we run unit tests common for each repository type and for
 
example specific mercurial tests are located at ``test_hg`` module.
 

	
 
Oh, and tests are run with ``unittest.collector`` wrapped by ``collector``
 
function at ``tests/__init__.py``.
 

	
 
.. _vcs: http://bitbucket.org/marcinkuzminski/vcs
 
.. _unittest: http://pypi.python.org/pypi/unittest
 

	
 
"""
 
import os
 
from rhodecode.lib import vcs
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from rhodecode.tests import *
 
from utils import VCSTestError, SCMFetcher
 

	
 

	
 
def setup_package():
 
    """
 
    Prepares whole package for tests which mainly means it would try to fetch
 
    test repositories or use already existing ones.
 
    """
 
    fetchers = {
 
        'hg': {
 
            'alias': 'hg',
 
            'test_repo_path': TEST_HG_REPO,
 
            'remote_repo': HG_REMOTE_REPO,
 
            'clone_cmd': 'hg clone',
 
        },
 
        'git': {
 
            'alias': 'git',
 
            'test_repo_path': TEST_GIT_REPO,
 
            'remote_repo': GIT_REMOTE_REPO,
 
            'clone_cmd': 'git clone --bare',
 
        },
 
    }
 
    try:
 
        for scm, fetcher_info in fetchers.items():
 
            fetcher = SCMFetcher(**fetcher_info)
 
            fetcher.setup()
 
    except VCSTestError, err:
 
        raise RuntimeError(str(err))
 

	
 
start_dir = os.path.abspath(os.path.dirname(__file__))
 
unittest.defaultTestLoader.discover(start_dir)
 
#start_dir = os.path.abspath(os.path.dirname(__file__))
 
#unittest.defaultTestLoader.discover(start_dir)
rhodecode/tests/vcs/conf.py
Show inline comments
 
"""
 
Unit tests configuration module for vcs.
 
"""
 

	
 
import os
 
import time
 
import hashlib
 
import tempfile
 
import datetime
 

	
 
from rhodecode.tests import *
 
from utils import get_normalized_path
 
from os.path import join as jn
 

	
 
__all__ = (
 
    'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
 
    'SCM_TESTS',
 
)
 

	
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
TEST_TMP_PATH = TESTS_TMP_PATH
 
#__all__ = (
 
#    'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
 
#    'SCM_TESTS',
 
#)
 
#
 
#SCM_TESTS = ['hg', 'git']
 
#uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 
#
 
THIS = os.path.abspath(os.path.dirname(__file__))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
 
TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-git'))
 
TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
 
                            jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
 
                            jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-hg'))
 
TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
 
                              jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
 
TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
 
                              jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
 

	
 
TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 
#
 
#GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 
#
 
#TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
 
#TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
 
#                              jn(TEST_TMP_PATH, 'vcs-git'))
 
#TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
 
#                            jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
#TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
 
#                            jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 
#
 
#HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
#TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
 
#                              jn(TEST_TMP_PATH, 'vcs-hg'))
 
#TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
 
#                              jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
 
#TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
 
#                              jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
 
#
 
#TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
#TEST_REPO_PREFIX = 'vcs-test'
 
#
 
#
 
#def get_new_dir(title):
 
#    """
 
#    Returns always new directory path.
 
#    """
 
#    name = TEST_REPO_PREFIX
 
#    if title:
 
#        name = '-'.join((name, title))
 
#    hex = hashlib.sha1(str(time.time())).hexdigest()
 
#    name = '-'.join((name, hex))
 
#    path = os.path.join(TEST_DIR, name)
 
#    return get_normalized_path(path)
 

	
 
PACKAGE_DIR = os.path.abspath(os.path.join(
 
    os.path.dirname(__file__), '..'))
 

	
 
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
rhodecode/tests/vcs/test_hg.py
Show inline comments
 
@@ -111,104 +111,100 @@ class MercurialRepositoryTest(unittest.T
 
                '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                '6fff84722075f1607a30f436523403845f84cd9e',
 
                '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                'be90031137367893f1c406e0a8683010fd115b79',
 
                'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                '84478366594b424af694a6c784cb991a16b87c21',
 
                '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                'eada5a770da98ab0dd7325e29d00e0714f228d09',
 
                '2c1885c735575ca478bf9e17b0029dca68824458',
 
                'd9bcd465040bf869799b09ad732c04e0eea99fe9',
 
                '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
 
                '4fb8326d78e5120da2c7468dcf7098997be385da',
 
                '62b4a097164940bd66030c4db51687f3ec035eed',
 
                '536c1a19428381cfea92ac44985304f6a8049569',
 
                '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
 
                '9bb326a04ae5d98d437dece54be04f830cf1edd9',
 
                'f8940bcb890a98c4702319fbe36db75ea309b475',
 
                'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
 
                '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
 
                'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
 
        self.assertEqual(org, self.repo.revisions[:31])
 

	
 
    def test_iter_slice(self):
 
        sliced = list(self.repo[:10])
 
        itered = list(self.repo)[:10]
 
        self.assertEqual(sliced, itered)
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 

	
 
        #active branches
 
        self.assertTrue('default' in self.repo.branches)
 

	
 
        #closed branches
 
        self.assertFalse('web' in self.repo.branches)
 
        self.assertFalse('git' in self.repo.branches)
 
        self.assertTrue('git' in self.repo.branches)
 

	
 
        # closed
 
        self.assertTrue('workdir' in self.repo._get_branches(closed=True))
 
        self.assertTrue('webvcs' in self.repo._get_branches(closed=True))
 
        self.assertTrue('web' in self.repo._get_branches(closed=True))
 

	
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), MercurialChangeset))
 

	
 
    def test_tip_in_tags(self):
 
        # tip is always a tag
 
        self.assertIn('tip', self.repo.tags)
 

	
 
    def test_tip_changeset_in_tags(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(self.repo.tags['tip'], tip.raw_id)
 

	
 
    def test_initial_changeset(self):
 

	
 
        init_chset = self.repo.get_changeset(0)
 
        self.assertEqual(init_chset.message, 'initial import')
 
        self.assertEqual(init_chset.author,
 
            'Marcin Kuzminski <marcin@python-blog.com>')
 
        self.assertEqual(sorted(init_chset._file_paths),
 
            sorted([
 
                'vcs/__init__.py',
 
                'vcs/backends/BaseRepository.py',
 
                'vcs/backends/__init__.py',
 
            ])
 
        )
 
        self.assertEqual(sorted(init_chset._dir_paths),
 
            sorted(['', 'vcs', 'vcs/backends']))
 

	
 
        self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.FILE)
 

	
 
    def test_not_existing_changeset(self):
 
        #rawid
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'abcd' * 10)
 
        #shortid
rhodecode/tests/vcs_test_git.tar.gz
Show inline comments
 
new file 100644
 
binary diff not shown
rhodecode/tests/vcs_test_hg.tar.gz
Show inline comments
 
binary diff not shown
0 comments (0 inline, 0 general)