Changeset - fa85c3df571b
[Not reviewed]
default
0 1 0
Mads Kiilerich - 9 years ago 2016-09-25 17:21:07
madski@unity3d.com
utils: remove unnecessary import of Session
1 file changed with 1 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import re
 
import logging
 
import datetime
 
import traceback
 
import paste
 
import beaker
 
import tarfile
 
import shutil
 
import decorator
 
import warnings
 
from os.path import abspath
 
from os.path import dirname
 

	
 
from webhelpers.text import collapse, remove_formatting, strip_tags
 
from beaker.cache import _cache_decorate
 

	
 
from kallithea.lib.vcs.utils.hgcompat import ui, config
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
from kallithea.model import meta
 
from kallithea.model.db import Repository, User, Ui, \
 
    UserLog, RepoGroup, Setting, UserGroup
 
from kallithea.model.meta import Session
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_current_authuser
 
from kallithea.lib.vcs.utils.fakemod import create_module
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in """`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
    _repo = request.environ['pylons.routes_dict'].get('repo_name')
 
    if _repo:
 
        _repo = _repo.rstrip('/')
 
    return _repo
 

	
 

	
 
def get_repo_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('group_name')
 
    if _group:
 
        _group = _group.rstrip('/')
 
    return _group
 

	
 

	
 
def get_user_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('id')
 
    _group = UserGroup.get(_group)
 
    if _group:
 
        return _group.users_group_name
 
    return None
 

	
 

	
 
def _extract_id_from_repo_name(repo_name):
 
    if repo_name.startswith('/'):
 
        repo_name = repo_name.lstrip('/')
 
    by_id_match = re.match(r'^_(\d{1,})', repo_name)
 
    if by_id_match:
 
        return by_id_match.groups()[0]
 

	
 

	
 
def get_repo_by_id(repo_name):
 
    """
 
    Extracts repo_name by id from special urls. Example url is _11/repo_name
 

	
 
    :param repo_name:
 
    :return: repo_name if matched else None
 
    """
 
    _repo_id = _extract_id_from_repo_name(repo_name)
 
    if _repo_id:
 
        from kallithea.model.db import Repository
 
        repo = Repository.get(_repo_id)
 
        if repo:
 
            # TODO: return repo instead of reponame? or would that be a layering violation?
 
            return repo.repo_name
 
    return None
 

	
 

	
 
def action_logger(user, action, repo, ipaddr='', sa=None, commit=False):
 
@@ -556,193 +555,193 @@ def load_rcextensions(root_path):
 
    path = os.path.join(root_path, 'rcextensions', '__init__.py')
 
    if os.path.isfile(path):
 
        rcext = create_module('rc', path)
 
        EXT = kallithea.EXTENSIONS = rcext
 
        log.debug('Found rcextensions now loading %s...', rcext)
 

	
 
        # Additional mappings that are not present in the pygments lexers
 
        conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(EXT, 'EXTRA_MAPPINGS', {}))
 

	
 
        #OVERRIDE OUR EXTENSIONS FROM RC-EXTENSIONS (if present)
 

	
 
        if getattr(EXT, 'INDEX_EXTENSIONS', []):
 
            log.debug('settings custom INDEX_EXTENSIONS')
 
            conf.INDEX_EXTENSIONS = getattr(EXT, 'INDEX_EXTENSIONS', [])
 

	
 
        #ADDITIONAL MAPPINGS
 
        log.debug('adding extra into INDEX_EXTENSIONS')
 
        conf.INDEX_EXTENSIONS.extend(getattr(EXT, 'EXTRA_INDEX_EXTENSIONS', []))
 

	
 
        # auto check if the module is not missing any data, set to default if is
 
        # this will help autoupdate new feature of rcext module
 
        #from kallithea.config import rcextensions
 
        #for k in dir(rcextensions):
 
        #    if not k.startswith('_') and not hasattr(EXT, k):
 
        #        setattr(EXT, k, getattr(rcextensions, k))
 

	
 

	
 
def get_custom_lexer(extension):
 
    """
 
    returns a custom lexer if it's defined in rcextensions module, or None
 
    if there's no custom lexer defined
 
    """
 
    import kallithea
 
    from pygments import lexers
 
    #check if we didn't define this extension as other lexer
 
    if kallithea.EXTENSIONS and extension in kallithea.EXTENSIONS.EXTRA_LEXERS:
 
        _lexer_name = kallithea.EXTENSIONS.EXTRA_LEXERS[extension]
 
        return lexers.get_lexer_by_name(_lexer_name)
 

	
 

	
 
#==============================================================================
 
# TEST FUNCTIONS AND CREATORS
 
#==============================================================================
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 

	
 
    :param config: test config
 
    :param full_index:
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock, LockHeld
 

	
 
    repo_location = repo_location
 

	
 
    index_location = os.path.join(config['app_conf']['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=os.path.join(dirname(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location) \
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 
    from kallithea.lib.db_manage import DbManage
 
    from kallithea.tests.base import HG_REPO, GIT_REPO, TESTS_TMP_PATH
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.url']
 
    log.debug('making test db %s', dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s', repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    # for tests dynamically set new root paths based on generated content
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    meta.Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['app_conf']['index_dir']
 
    data_path = config['app_conf']['cache_dir']
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT TEST REPOS
 
    cur_dir = dirname(dirname(abspath(__file__)))
 
    tar = tarfile.open(os.path.join(cur_dir, 'tests', 'fixtures', "vcs_test_hg.tar.gz"))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    cur_dir = dirname(dirname(abspath(__file__)))
 
    tar = tarfile.open(os.path.join(cur_dir, 'tests', 'fixtures', "vcs_test_git.tar.gz"))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def check_git_version():
 
    """
 
    Checks what version of git is installed in system, and issues a warning
 
    if it's too old for Kallithea to work properly.
 
    """
 
    from kallithea import BACKENDS
 
    from kallithea.lib.vcs.backends.git.repository import GitRepository
 
    from kallithea.lib.vcs.conf import settings
 
    from distutils.version import StrictVersion
 

	
 
    if 'git' not in BACKENDS:
 
        return None
 

	
 
    stdout, stderr = GitRepository._run_git_command(['--version'], _bare=True,
 
                                                    _safe=True)
 

	
 
    m = re.search("\d+.\d+.\d+", stdout)
 
    if m:
 
        ver = StrictVersion(m.group(0))
 
    else:
 
        ver = StrictVersion('0.0.0')
 

	
 
    req_ver = StrictVersion('1.7.4')
 

	
 
    log.debug('Git executable: "%s" version %s detected: %s',
 
              settings.GIT_EXECUTABLE_PATH, ver, stdout)
 
    if stderr:
 
        log.warning('Error detecting git version: %r', stderr)
 
    elif ver < req_ver:
 
        log.warning('Kallithea detected git version %s, which is too old '
 
                    'for the system to function properly. '
 
                    'Please upgrade to version %s or later.' % (ver, req_ver))
 
    return ver
 

	
 

	
 
@decorator.decorator
 
def jsonify(func, *args, **kwargs):
 
    """Action decorator that formats output for JSON
 

	
 
    Given a function that will return content, this decorator will turn
 
    the result into JSON, with a content-type of 'application/json' and
 
    output it.
 

	
 
    """
 
    from pylons.decorators.util import get_pylons
 
    from kallithea.lib.compat import json
 
    pylons = get_pylons(args)
 
    pylons.response.headers['Content-Type'] = 'application/json; charset=utf-8'
 
    data = func(*args, **kwargs)
 
    if isinstance(data, (list, tuple)):
 
        msg = "JSON responses with Array envelopes are susceptible to " \
 
              "cross-site data leak attacks, see " \
 
              "http://wiki.pylonshq.com/display/pylonsfaq/Warnings"
 
        warnings.warn(msg, Warning, 2)
 
        log.warning(msg)
 
    log.debug("Returning JSON wrapped action output")
 
    return json.dumps(data, encoding='utf-8')
 

	
 

	
 
#===============================================================================
 
# CACHE RELATED METHODS
 
#===============================================================================
 

	
 
# set cache regions for beaker so celery can utilise it
 
def setup_cache_regions(settings):
0 comments (0 inline, 0 general)