Changeset - b313d735d9c8
[Not reviewed]
default
0 12 0
domruf - 9 years ago 2016-06-12 21:21:43
dominikruf@gmail.com
cleanup: get rid of jn as shortcut for os.path.join

It is not used in that many places so it is better to be explicit and use os.path.join .

Discussed on https://bitbucket.org/domruf/kallithea/commits/1da05f42bca3f7042c444fda13a1ae1f6b9c300f#comment-2948124 .

Modified by Mads Kiilerich.
12 files changed with 50 insertions and 57 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/__init__.py
Show inline comments
 
@@ -17,28 +17,29 @@ kallithea.lib.celerylib
 

	
 
celery libs for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 27, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import socket
 
import traceback
 
import logging
 
from os.path import join as jn
 

	
 
from pylons import config
 

	
 
from hashlib import md5
 
from decorator import decorator
 

	
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea import CELERY_ON, CELERY_EAGER
 
from kallithea.lib.utils2 import str2bool, safe_str
 
from kallithea.lib.pidlock import DaemonLock, LockHeld
 
from kallithea.model import init_model
 
from kallithea.model import meta
 

	
 
@@ -89,25 +90,25 @@ def __get_lockkey(func, *fargs, **fkwarg
 
    lockkey = 'task_%s.lock' % \
 
        md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        lockkey_path = config['app_conf']['cache_dir']
 

	
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(file_=jn(lockkey_path, lockkey))
 
            l = DaemonLock(file_=os.path.join(lockkey_path, lockkey))
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -23,25 +23,24 @@ Original author and date, and relevant c
 
:created_on: Oct 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from celery.task import task
 

	
 
import os
 
import traceback
 
import logging
 
import rfc822
 
from os.path import join as jn
 

	
 
from time import mktime
 
from operator import itemgetter
 
from string import lower
 

	
 
from pylons import config
 

	
 
from kallithea import CELERY_ON
 
from kallithea.lib.celerylib import run_task, locked_task, dbsession, \
 
    str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
 
from kallithea.lib.helpers import person
 
from kallithea.lib.rcmail.smtp_mailer import SmtpMailer
 
@@ -83,25 +82,25 @@ def whoosh_index(repo_location, full_ind
 
@task(ignore_result=True)
 
@dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    log = get_logger(get_commits_stats)
 
    DBS = get_session()
 
    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config['app_conf']['cache_dir']
 

	
 
    log.info('running task with lockkey %s', lockkey)
 

	
 
    try:
 
        lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
 
        lock = l = DaemonLock(file_=os.path.join(lockkey_path, lockkey))
 

	
 
        # for js data compatibility cleans the key for person from '
 
        akc = lambda k: person(k).replace('"', "")
 

	
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo is None:
 
            return True
 

	
 
        repo = repo.scm_instance
 
        repo_size = repo.count()
kallithea/lib/db_manage.py
Show inline comments
 
@@ -22,25 +22,25 @@ This file was forked by the Kallithea pr
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 10, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import time
 
import uuid
 
import logging
 
from os.path import dirname as dn, join as jn
 
from os.path import dirname as dn
 

	
 
from kallithea import __dbversion__, __py_version__, EXTERN_TYPE_INTERNAL, DB_MIGRATIONS
 
from kallithea.model.user import UserModel
 
from kallithea.lib.utils import ask_ok
 
from kallithea.model import init_model
 
from kallithea.model.db import User, Permission, Ui, \
 
    Setting, UserToPerm, DbMigrateVersion, RepoGroup, \
 
    UserRepoGroupToPerm, CacheInvalidation, Repository
 

	
 
from sqlalchemy.engine import create_engine
 
from kallithea.model.repo_group import RepoGroupModel
 
#from kallithea.model import meta
 
@@ -129,26 +129,26 @@ class DbManage(object):
 
               '********************** WARNING **********************\n'
 
               'Make sure your version of sqlite is at least 3.7.X.  \n'
 
               'Earlier versions are known to fail on some migrations\n'
 
               '*****************************************************\n')
 

	
 
        upgrade = ask_ok('You are about to perform database upgrade, make '
 
                         'sure You backed up your database before. '
 
                         'Continue ? [y/n]')
 
        if not upgrade:
 
            print 'No upgrade performed'
 
            sys.exit(0)
 

	
 
        repository_path = jn(dn(dn(dn(os.path.realpath(__file__)))),
 
                             'kallithea/lib/dbmigrate')
 
        repository_path = os.path.join(dn(dn(dn(os.path.realpath(__file__)))),
 
                                       'kallithea', 'lib', 'dbmigrate')
 
        db_uri = self.dburi
 

	
 
        try:
 
            curr_version = api.db_version(db_uri, repository_path)
 
            msg = ('Found current database under version '
 
                   'control with version %s' % curr_version)
 

	
 
        except (RuntimeError, DatabaseNotControlledError):
 
            curr_version = 1
 
            msg = ('Current database is not under version control. Setting '
 
                   'as version %s' % curr_version)
 
            api.version_control(db_uri, repository_path, curr_version)
kallithea/lib/indexers/__init__.py
Show inline comments
 
@@ -19,25 +19,25 @@ Whoosh indexing module for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 17, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import logging
 
from os.path import dirname as dn, join as jn
 
from os.path import dirname as dn
 

	
 
# Add location of top level folder to sys.path
 
sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
 

	
 
from whoosh.analysis import RegexTokenizer, LowercaseFilter
 
from whoosh.fields import TEXT, ID, STORED, NUMERIC, BOOLEAN, Schema, FieldType, DATETIME
 
from whoosh.formats import Characters
 
from whoosh.highlight import highlight as whoosh_highlight, HtmlFormatter, ContextFragmenter
 
from kallithea.lib.utils2 import LazyProperty
 

	
 
log = logging.getLogger(__name__)
 

	
 
@@ -131,34 +131,34 @@ class WhooshResultWrapper(object):
 
        """
 
        i, j = key.start, key.stop
 

	
 
        slices = []
 
        for docid in self.doc_ids[i:j]:
 
            slices.append(self.get_full_content(docid))
 
        return slices
 

	
 
    def get_full_content(self, docid):
 
        res = self.searcher.stored_fields(docid[0])
 
        log.debug('result: %s', res)
 
        if self.search_type == 'content':
 
            full_repo_path = jn(self.repo_location, res['repository'])
 
            full_repo_path = os.path.join(self.repo_location, res['repository'])
 
            f_path = res['path'].split(full_repo_path)[-1]
 
            f_path = f_path.lstrip(os.sep)
 
            content_short = self.get_short_content(res, docid[1])
 
            res.update({'content_short': content_short,
 
                        'content_short_hl': self.highlight(content_short),
 
                        'f_path': f_path
 
            })
 
        elif self.search_type == 'path':
 
            full_repo_path = jn(self.repo_location, res['repository'])
 
            full_repo_path = os.path.join(self.repo_location, res['repository'])
 
            f_path = res['path'].split(full_repo_path)[-1]
 
            f_path = f_path.lstrip(os.sep)
 
            res.update({'f_path': f_path})
 
        elif self.search_type == 'message':
 
            res.update({'message_hl': self.highlight(res['message'])})
 

	
 
        log.debug('result: %s', res)
 

	
 
        return res
 

	
 
    def get_short_content(self, res, chunks):
 

	
kallithea/lib/indexers/daemon.py
Show inline comments
 
@@ -26,25 +26,24 @@ Original author and date, and relevant c
 
"""
 

	
 

	
 
import os
 
import sys
 
import logging
 
import traceback
 

	
 
from shutil import rmtree
 
from time import mktime
 

	
 
from os.path import dirname as dn
 
from os.path import join as jn
 

	
 
# Add location of top level folder to sys.path
 
project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
 
sys.path.append(project_path)
 

	
 
from kallithea.config.conf import INDEX_EXTENSIONS, INDEX_FILENAMES
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.db import Repository
 
from kallithea.lib.utils2 import safe_unicode, safe_str
 
from kallithea.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
 
    CHGSET_IDX_NAME
 

	
 
@@ -127,25 +126,25 @@ class WhooshIndexingDaemon(object):
 
        return cs
 

	
 
    def get_paths(self, repo):
 
        """
 
        recursive walk in root dir and return a set of all path in that dir
 
        based on repository walk function
 
        """
 
        index_paths_ = set()
 
        try:
 
            cs = self._get_index_changeset(repo)
 
            for _topnode, _dirs, files in cs.walk('/'):
 
                for f in files:
 
                    index_paths_.add(jn(safe_str(repo.path), safe_str(f.path)))
 
                    index_paths_.add(os.path.join(safe_str(repo.path), safe_str(f.path)))
 

	
 
        except RepositoryError:
 
            log.debug(traceback.format_exc())
 
            pass
 
        return index_paths_
 

	
 
    def get_node(self, repo, path, index_rev=None):
 
        """
 
        gets a filenode based on given full path. It operates on string for
 
        hg git compatibility.
 

	
 
        :param repo: scm repo instance
kallithea/lib/utils.py
Show inline comments
 
@@ -28,25 +28,25 @@ Original author and date, and relevant c
 
import os
 
import re
 
import logging
 
import datetime
 
import traceback
 
import paste
 
import beaker
 
import tarfile
 
import shutil
 
import decorator
 
import warnings
 
from os.path import abspath
 
from os.path import dirname as dn, join as jn
 
from os.path import dirname as dn
 

	
 
from paste.script.command import Command, BadCommand
 

	
 
from webhelpers.text import collapse, remove_formatting, strip_tags
 
from beaker.cache import _cache_decorate
 

	
 
from kallithea import BRAND
 

	
 
from kallithea.lib.vcs.utils.hgcompat import ui, config
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
@@ -644,25 +644,25 @@ def create_test_index(repo_location, con
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock, LockHeld
 

	
 
    repo_location = repo_location
 

	
 
    index_location = os.path.join(config['app_conf']['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=jn(dn(index_location), 'make_index.lock'))
 
        l = DaemonLock(file_=os.path.join(dn(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location) \
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
@@ -697,31 +697,31 @@ def create_test_env(repos_test_path, con
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT TEST REPOS
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_hg.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
 
    tar = tarfile.open(os.path.join(cur_dir, 'tests', 'fixtures', "vcs_test_hg.tar.gz"))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_git.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
 
    tar = tarfile.open(os.path.join(cur_dir, 'tests', 'fixtures', "vcs_test_git.tar.gz"))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
#==============================================================================
 
# PASTER COMMANDS
 
#==============================================================================
 
class BasePasterCommand(Command):
 
    """
kallithea/model/scm.py
Show inline comments
 
@@ -25,25 +25,24 @@ Original author and date, and relevant c
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import posixpath
 
import re
 
import time
 
import traceback
 
import logging
 
import cStringIO
 
import pkg_resources
 
from os.path import join as jn
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
import kallithea
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea import BACKENDS
 
@@ -730,41 +729,41 @@ class ScmModel(BaseModel):
 
        choices.extend([x[0] for x in tags_group[0]])
 

	
 
        return choices, hist_l
 

	
 
    def install_git_hooks(self, repo, force_create=False):
 
        """
 
        Creates a kallithea hook inside a git repository
 

	
 
        :param repo: Instance of VCS repo
 
        :param force_create: Create even if same name hook exists
 
        """
 

	
 
        loc = jn(repo.path, 'hooks')
 
        loc = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            loc = jn(repo.path, '.git', 'hooks')
 
            loc = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(loc):
 
            os.makedirs(loc)
 

	
 
        tmpl_post = "#!/usr/bin/env %s\n" % sys.executable or 'python2'
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', jn('config', 'post_receive_tmpl.py')
 
            'kallithea', os.path.join('config', 'post_receive_tmpl.py')
 
        )
 
        tmpl_pre = "#!/usr/bin/env %s\n" % sys.executable or 'python2'
 
        tmpl_pre += pkg_resources.resource_string(
 
            'kallithea', jn('config', 'pre_receive_tmpl.py')
 
            'kallithea', os.path.join('config', 'pre_receive_tmpl.py')
 
        )
 

	
 
        for h_type, tmpl in [('pre', tmpl_pre), ('post', tmpl_post)]:
 
            _hook_file = jn(loc, '%s-receive' % h_type)
 
            _hook_file = os.path.join(loc, '%s-receive' % h_type)
 
            has_hook = False
 
            log.debug('Installing git hook in repo %s', repo)
 
            if os.path.exists(_hook_file):
 
                # let's take a look at this hook, maybe it's kallithea ?
 
                log.debug('hook exists, checking if it is from kallithea')
 
                with open(_hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.compile(r'(?:%s)\s*=\s*(.*)'
 
                                         % 'KALLITHEA_HOOK_VER').search(data)
 
                    if matches:
 
                        try:
 
                            ver = matches.groups()[0]
kallithea/tests/__init__.py
Show inline comments
 
@@ -20,25 +20,24 @@ This package assumes the Pylons environm
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 

	
 
Refer to docs/contributing.rst for details on running the test suite.
 
"""
 
import os
 
import re
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from tempfile import _RandomNameSequence
 

	
 
import pylons
 
import pylons.test
 
from pylons import config, url
 
from pylons.i18n.translation import _get_translator
 
from pylons.util import ContextObj
 

	
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
import pytest
 
@@ -70,25 +69,25 @@ __all__ = [
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn(tempfile.gettempdir(), 'rc_test_%s' % _RandomNameSequence().next())
 
TESTS_TMP_PATH = os.path.join(tempfile.gettempdir(), 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@example.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@example.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@example.com'
 

	
 
@@ -98,42 +97,42 @@ GIT_REPO = u'vcs_test_git'
 
NEW_HG_REPO = u'vcs_test_hg_new'
 
NEW_GIT_REPO = u'vcs_test_git_new'
 

	
 
HG_FORK = u'vcs_test_hg_fork'
 
GIT_FORK = u'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 
TEST_GIT_REPO = os.path.join(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = os.path.join(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = os.path.join(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 
TEST_HG_REPO = os.path.join(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = os.path.join(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = os.path.join(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
GIT_REMOTE_REPO = os.path.join(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = os.path.join(TESTS_TMP_PATH, HG_REPO)
 

	
 
#skip ldap tests if LDAP lib is not installed
 
ldap_lib_installed = False
 
try:
 
    import ldap
 
    ldap.API_VERSION
 
    ldap_lib_installed = True
 
except ImportError:
 
    # means that python-ldap is not installed
 
    pass
 

	
 
try:
kallithea/tests/other/manual_test_vcs_operations.py
Show inline comments
 
@@ -27,25 +27,24 @@ This file was forked by the Kallithea pr
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import re
 
import tempfile
 
import time
 
from os.path import join as jn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 

	
 
DEBUG = True
 
@@ -104,27 +103,27 @@ def _construct_url(repo, dest=None, **kw
 
    return _url
 

	
 

	
 
def _add_files_and_push(vcs, DEST, **kwargs):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    """
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
 
    cwd = path = os.path.join(DEST)
 
    #added_file = os.path.join(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    added_file = os.path.join(path, '%ssetup.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(kwargs.get('files_no', 3)):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
        if vcs == 'hg':
kallithea/tests/scripts/manual_test_concurrency.py
Show inline comments
 
@@ -21,25 +21,24 @@ This file was forked by the Kallithea pr
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import sys
 
import shutil
 
import logging
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 

	
 
from kallithea.lib.utils import add_cache
 
from kallithea.model import init_model
 
from kallithea.model import meta
 
from kallithea.model.db import User, Repository, Ui
 
@@ -152,33 +151,33 @@ def set_anonymous_access(enable=True):
 

	
 

	
 
def get_anonymous_access():
 
    sa = get_session()
 
    return sa.query(User).filter(User.username == 'default').one().active
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 
def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
 
                                seq=None, backend='hg'):
 
    cwd = path = jn(Ui.get_by_key('paths', '/').ui_value, repo)
 
    cwd = path = os.path.join(Ui.get_by_key('paths', '/').ui_value, repo)
 

	
 
    if seq is None:
 
        seq = _RandomNameSequence().next()
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
        #print 'made dirs %s' % os.path.join(path)
 
    except OSError:
 
        raise
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
 
                  {'user': USER,
 
                   'pass': PASS,
 
                   'host': HOST,
 
                   'cloned_repo': repo, }
 

	
 
    dest = path + seq
 
    if method == 'pull':
 
        stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url)
kallithea/tests/scripts/manual_test_crawler.py
Show inline comments
 
@@ -29,69 +29,68 @@ Original author and date, and relevant c
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import cookielib
 
import urllib
 
import urllib2
 
import time
 
import os
 
import sys
 
import tempfile
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
__here__ = os.path.abspath(__file__)
 
__root__ = dn(dn(dn(__here__)))
 
sys.path.append(__root__)
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.vcs.exceptions import RepositoryError
 

	
 
PASES = 3
 
HOST = 'http://127.0.0.1'
 
PORT = 5000
 
BASE_URI = '%s:%s/' % (HOST, PORT)
 

	
 
if len(sys.argv) == 2:
 
    BASE_URI = sys.argv[1]
 

	
 
if not BASE_URI.endswith('/'):
 
    BASE_URI += '/'
 

	
 
print 'Crawling @ %s' % BASE_URI
 
BASE_URI += '%s'
 
PROJECT_PATH = jn('/', 'home', 'username', 'repos')
 
PROJECT_PATH = os.path.join('/', 'home', 'username', 'repos')
 
PROJECTS = [
 
    #'linux-magx-pbranch',
 
    'CPython',
 
    'kallithea',
 
]
 

	
 

	
 
cj = cookielib.FileCookieJar(jn(tempfile.gettempdir(), 'rc_test_cookie.txt'))
 
cj = cookielib.FileCookieJar(os.path.join(tempfile.gettempdir(), 'rc_test_cookie.txt'))
 
o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
 
o.addheaders = [
 
    ('User-agent', 'kallithea-crawler'),
 
    ('Accept-Language', 'en - us, en;q = 0.5')
 
]
 

	
 
urllib2.install_opener(o)
 

	
 

	
 
def _get_repo(proj):
 
    if isinstance(proj, basestring):
 
        repo = vcs.get_repo(jn(PROJECT_PATH, proj))
 
        repo = vcs.get_repo(os.path.join(PROJECT_PATH, proj))
 
        proj = proj
 
    else:
 
        repo = proj
 
        proj = repo.name
 

	
 
    return repo, proj
 

	
 

	
 
def test_changelog_walk(proj, pages=100):
 
    repo, proj = _get_repo(proj)
 

	
 
    total_time = 0
 
@@ -108,25 +107,25 @@ def test_changelog_walk(proj, pages=100)
 
        size = len(f.read())
 
        e = time.time() - s
 
        total_time += e
 
        print 'visited %s size:%s req:%s ms' % (full_uri, size, e)
 

	
 
    print 'total_time', total_time
 
    print 'average on req', total_time / float(pages)
 

	
 

	
 
def test_changeset_walk(proj, limit=None):
 
    repo, proj = _get_repo(proj)
 

	
 
    print 'processing', jn(PROJECT_PATH, proj)
 
    print 'processing', os.path.join(PROJECT_PATH, proj)
 
    total_time = 0
 

	
 
    cnt = 0
 
    for i in repo:
 
        cnt += 1
 
        raw_cs = '/'.join((proj, 'changeset', i.raw_id))
 
        if limit and limit == cnt:
 
            break
 

	
 
        full_uri = (BASE_URI % raw_cs)
 
        print '%s visiting %s\%s' % (cnt, full_uri, i)
 
        s = time.time()
 
@@ -134,25 +133,25 @@ def test_changeset_walk(proj, limit=None
 
        size = len(f.read())
 
        e = time.time() - s
 
        total_time += e
 
        print '%s visited %s\%s size:%s req:%s ms' % (cnt, full_uri, i, size, e)
 

	
 
    print 'total_time', total_time
 
    print 'average on req', total_time / float(cnt)
 

	
 

	
 
def test_files_walk(proj, limit=100):
 
    repo, proj = _get_repo(proj)
 

	
 
    print 'processing', jn(PROJECT_PATH, proj)
 
    print 'processing', os.path.join(PROJECT_PATH, proj)
 
    total_time = 0
 

	
 
    paths_ = OrderedSet([''])
 
    try:
 
        tip = repo.get_changeset('tip')
 
        for topnode, dirs, files in tip.walk('/'):
 

	
 
            for dir in dirs:
 
                paths_.add(dir.path)
 
                for f in dir:
 
                    paths_.add(f.path)
 

	
 
@@ -174,18 +173,18 @@ def test_files_walk(proj, limit=100):
 
        s = time.time()
 
        f = o.open(full_uri)
 
        size = len(f.read())
 
        e = time.time() - s
 
        total_time += e
 
        print '%s visited OK size:%s req:%s ms' % (cnt, size, e)
 

	
 
    print 'total_time', total_time
 
    print 'average on req', total_time / float(cnt)
 

	
 
if __name__ == '__main__':
 
    for path in PROJECTS:
 
        repo = vcs.get_repo(jn(PROJECT_PATH, path))
 
        repo = vcs.get_repo(os.path.join(PROJECT_PATH, path))
 
        for i in range(PASES):
 
            print 'PASS %s/%s' % (i, PASES)
 
            test_changelog_walk(repo, pages=80)
 
            test_changeset_walk(repo, limit=100)
 
            test_files_walk(repo, limit=100)
kallithea/tests/vcs/conf.py
Show inline comments
 
"""
 
Unit tests configuration module for vcs.
 
"""
 
import os
 
import time
 
import hashlib
 
import tempfile
 
import datetime
 
import shutil
 
import uuid
 
from os.path import join as jn
 

	
 
__all__ = (
 
    'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
 
    'SCM_TESTS',
 
)
 

	
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
THIS = os.path.abspath(os.path.dirname(__file__))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-git'))
 
                               os.path.join(TEST_TMP_PATH, 'vcs-git'))
 
TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
 
                            jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
                                     os.path.join(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
 
                            jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 
                                    os.path.join(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-hg'))
 
                              os.path.join(TEST_TMP_PATH, 'vcs-hg'))
 
TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
 
                              jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
 
                                    os.path.join(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
 
TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
 
                              jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
 
                                   os.path.join(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
 

	
 
TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 

	
 
def get_new_dir(title=None):
 
    """
 
    Calculates a path for a new, non-existant, unique sub-directory in TEST_DIR.
 

	
 
    Resulting directory name will have format:
 

	
 
    prefix-[title-]hexuuid
 
@@ -71,18 +70,18 @@ def get_new_dir(title=None):
 
    path = os.path.join(TEST_DIR, name)
 

	
 
    # Generate new hexes until we get a unique name (just in case).
 
    hex_uuid = uuid.uuid4().hex
 
    while os.path.exists("%s-%s" % (path, hex_uuid)):
 
        hex_uuid = uuid.uuid4().hex
 

	
 
    return "%s-%s" % (path, hex_uuid)
 

	
 

	
 
PACKAGE_DIR = os.path.abspath(os.path.join(
 
    os.path.dirname(__file__), '..'))
 
_dest = jn(TEST_TMP_PATH, 'aconfig')
 
shutil.copy(jn(THIS, 'aconfig'), _dest)
 
_dest = os.path.join(TEST_TMP_PATH, 'aconfig')
 
shutil.copy(os.path.join(THIS, 'aconfig'), _dest)
 
TEST_USER_CONFIG_FILE = _dest
 

	
 
#overide default configurations with kallithea ones
 
from kallithea.tests import *
0 comments (0 inline, 0 general)