Changeset - e51ad2cd400e
[Not reviewed]
kallithea/bin/kallithea_cli_repo.py
Show inline comments
 
@@ -19,25 +19,25 @@ Original author and date, and relevant c
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import datetime
 
import os
 
import re
 
import shutil
 

	
 
import click
 

	
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.lib.utils import REMOVED_REPO_PAT, repo2db_mapper
 
from kallithea.lib.utils2 import ask_ok, safe_str
 
from kallithea.lib.utils2 import ask_ok
 
from kallithea.model.db import Repository, Ui
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.option('--remove-missing', is_flag=True,
 
        help='Remove missing repositories from the Kallithea database.')
 
def repo_scan(remove_missing):
 
    """Scan filesystem for repositories.
 

	
 
    Search the configured repository root for new repositories and add them
 
@@ -118,25 +118,25 @@ def repo_purge_deleted(ask, older_than):
 
    def _extract_date(name):
 
        """
 
        Extract the date part from rm__<date> pattern of removed repos,
 
        and convert it to datetime object
 

	
 
        :param name:
 
        """
 
        date_part = name[4:19]  # 4:19 since we don't parse milliseconds
 
        return datetime.datetime.strptime(date_part, '%Y%m%d_%H%M%S')
 

	
 
    repos_location = Ui.get_repos_location()
 
    to_remove = []
 
    for dn_, dirs, f in os.walk(safe_str(repos_location)):
 
    for dn_, dirs, f in os.walk(repos_location):
 
        alldirs = list(dirs)
 
        del dirs[:]
 
        if ('.hg' in alldirs or
 
            '.git' in alldirs or
 
            '.svn' in alldirs or
 
            'objects' in alldirs and ('refs' in alldirs or 'packed-refs' in f)
 
        ):
 
            continue
 
        for loc in alldirs:
 
            if REMOVED_REPO_PAT.match(loc):
 
                to_remove.append([os.path.join(dn_, loc),
 
                                  _extract_date(loc)])
 
@@ -166,22 +166,21 @@ def repo_purge_deleted(ask, older_than):
 
                    % (older_than, older_than_date))
 
            return
 

	
 
        click.echo('Considering %s deleted repositories older than %s (%s).'
 
            % (len(to_remove), older_than, older_than_date))
 
    else:
 
        click.echo('Considering %s deleted repositories.' % len(to_remove))
 

	
 
    if not ask:
 
        remove = True
 
    else:
 
        remove = ask_ok('The following repositories will be removed completely:\n%s\n'
 
                'Do you want to proceed? [y/n] '
 
                % '\n'.join(['%s deleted on %s' % (safe_str(x[0]), safe_str(x[1]))
 
                                     for x in to_remove]))
 
            'Do you want to proceed? [y/n] ' %
 
            '\n'.join('%s deleted on %s' % (path, date_) for path, date_ in to_remove))
 

	
 
    if remove:
 
        for path, date_ in to_remove:
 
            click.echo('Purging repository %s' % path)
 
            shutil.rmtree(path)
 
    else:
 
        click.echo('Nothing done, exiting...')
kallithea/controllers/api/__init__.py
Show inline comments
 
@@ -30,39 +30,39 @@ import itertools
 
import logging
 
import time
 
import traceback
 
import types
 

	
 
from tg import Response, TGController, request, response
 
from webob.exc import HTTPError, HTTPException
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.base import _get_ip_addr as _get_ip
 
from kallithea.lib.base import get_path_info
 
from kallithea.lib.utils2 import ascii_bytes, safe_str
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger('JSONRPC')
 

	
 

	
 
class JSONRPCError(BaseException):
 

	
 
    def __init__(self, message):
 
        self.message = message
 
        super(JSONRPCError, self).__init__()
 

	
 
    def __str__(self):
 
        return safe_str(self.message)
 
        return self.message
 

	
 

	
 
class JSONRPCErrorResponse(Response, HTTPException):
 
    """
 
    Generate a Response object with a JSON-RPC error body
 
    """
 

	
 
    def __init__(self, message=None, retid=None, code=None):
 
        HTTPException.__init__(self, message, self)
 
        Response.__init__(self,
 
                          json_body=dict(id=retid, result=None, error=message),
 
                          status=code,
kallithea/controllers/compare.py
Show inline comments
 
@@ -34,25 +34,25 @@ import mercurial.unionrepo
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPFound, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.controllers.changeset import _context_url, _ignorews_url
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.utils2 import ascii_bytes, ascii_str, safe_bytes, safe_int, safe_str
 
from kallithea.lib.utils2 import ascii_bytes, ascii_str, safe_bytes, safe_int
 
from kallithea.model.db import Repository
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class CompareController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(CompareController, self)._before(*args, **kwargs)
 

	
 
        # The base repository has already been retrieved.
 
@@ -126,28 +126,28 @@ class CompareController(BaseRepoControll
 
                org_repo.get_changeset(ascii_str(hgrepo[rev].hex()))
 
                for rev in hgrepo.revs(
 
                    b"ancestors(id(%s)) and not ancestors(id(%s)) and not id(%s)",
 
                    ascii_bytes(org_rev), ascii_bytes(other_rev), ascii_bytes(other_rev))
 
            ]
 

	
 
        elif alias == 'git':
 
            if org_repo != other_repo:
 
                from dulwich.repo import Repo
 
                from dulwich.client import SubprocessGitClient
 

	
 
                gitrepo = Repo(org_repo.path)
 
                SubprocessGitClient(thin_packs=False).fetch(safe_str(other_repo.path), gitrepo)
 
                SubprocessGitClient(thin_packs=False).fetch(other_repo.path, gitrepo)
 

	
 
                gitrepo_remote = Repo(other_repo.path)
 
                SubprocessGitClient(thin_packs=False).fetch(safe_str(org_repo.path), gitrepo_remote)
 
                SubprocessGitClient(thin_packs=False).fetch(org_repo.path, gitrepo_remote)
 

	
 
                revs = [
 
                    ascii_str(x.commit.id)
 
                    for x in gitrepo_remote.get_walker(include=[ascii_bytes(other_rev)],
 
                                                       exclude=[ascii_bytes(org_rev)])
 
                ]
 
                other_changesets = [other_repo.get_changeset(rev) for rev in reversed(revs)]
 
                if other_changesets:
 
                    ancestors = [other_changesets[0].parents[0].raw_id]
 
                else:
 
                    # no changesets from other repo, ancestor is the other_rev
 
                    ancestors = [other_rev]
kallithea/controllers/files.py
Show inline comments
 
@@ -37,25 +37,25 @@ from tg import request, response
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.controllers.changeset import _context_url, _ignorews_url, anchor_url, get_ignore_ws, get_line_ctx
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.exceptions import NonRelativePathError
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import convert_line_endings, detect_mode, safe_int, safe_str, safe_unicode, str2bool
 
from kallithea.lib.utils2 import convert_line_endings, detect_mode, safe_int, safe_unicode, str2bool
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (
 
    ChangesetDoesNotExistError, ChangesetError, EmptyRepositoryError, ImproperArchiveTypeError, NodeAlreadyExistsError, NodeDoesNotExistError, NodeError, RepositoryError, VCSError)
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
@@ -223,26 +223,26 @@ class FilesController(BaseRepoController
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 
            c.authors = []
 
            for a in set([x.author for x in _hist]):
 
                c.authors.append((h.email(a), h.person(a)))
 
            return render('files/files_history_box.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def rawfile(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        response.content_disposition = 'attachment; filename=%s' % \
 
            safe_str(f_path.split(Repository.url_sep())[-1])
 
        response.content_disposition = \
 
            'attachment; filename=%s' % f_path.split(Repository.url_sep())[-1]
 

	
 
        response.content_type = file_node.mimetype
 
        return file_node.content
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def raw(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        raw_mimetype_mapping = {
 
            # map original mimetype to a mimetype used for "show as raw"
 
@@ -268,26 +268,25 @@ class FilesController(BaseRepoController
 
                mimetype, dispo = 'application/octet-stream', 'attachment'
 
            else:
 
                # do not just use the original mimetype, but force text/plain,
 
                # otherwise it would serve text/html and that might be unsafe.
 
                # Note: underlying vcs library fakes text/plain mimetype if the
 
                # mimetype can not be determined and it thinks it is not
 
                # binary.This might lead to erroneous text display in some
 
                # cases, but helps in other cases, like with text files
 
                # without extension.
 
                mimetype, dispo = 'text/plain', 'inline'
 

	
 
        if dispo == 'attachment':
 
            dispo = 'attachment; filename=%s' % \
 
                        safe_str(f_path.split(os.sep)[-1])
 
            dispo = 'attachment; filename=%s' % f_path.split(os.sep)[-1]
 

	
 
        response.content_disposition = dispo
 
        response.content_type = mimetype
 
        return file_node.content
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def delete(self, repo_name, revision, f_path):
 
        repo = c.db_repo
 
        # check if revision is a branch identifier- basically we cannot
 
        # create multiple heads via file editing
 
        _branches = repo.scm_instance.branches
 
@@ -499,26 +498,25 @@ class FilesController(BaseRepoController
 

	
 
            cs = c.db_repo_scm_instance.get_changeset(revision)
 
            content_type = settings.ARCHIVE_SPECS[fileformat][0]
 
        except ChangesetDoesNotExistError:
 
            return _('Unknown revision %s') % revision
 
        except EmptyRepositoryError:
 
            return _('Empty repository')
 
        except (ImproperArchiveTypeError, KeyError):
 
            return _('Unknown archive type')
 

	
 
        from kallithea import CONFIG
 
        rev_name = cs.raw_id[:12]
 
        archive_name = '%s-%s%s' % (safe_str(repo_name.replace('/', '_')),
 
                                    safe_str(rev_name), ext)
 
        archive_name = '%s-%s%s' % (repo_name.replace('/', '_'), rev_name, ext)
 

	
 
        archive_path = None
 
        cached_archive_path = None
 
        archive_cache_dir = CONFIG.get('archive_cache_dir')
 
        if archive_cache_dir and not subrepos: # TODO: subrepo caching?
 
            if not os.path.isdir(archive_cache_dir):
 
                os.makedirs(archive_cache_dir)
 
            cached_archive_path = os.path.join(archive_cache_dir, archive_name)
 
            if os.path.isfile(cached_archive_path):
 
                log.debug('Found cached archive in %s', cached_archive_path)
 
                archive_path = cached_archive_path
 
            else:
kallithea/controllers/login.py
Show inline comments
 
@@ -32,25 +32,24 @@ import re
 
import formencode
 
from formencode import htmlfill
 
from tg import request, session
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPFound
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.config.routing import url
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator
 
from kallithea.lib.base import BaseController, log_in_user, render
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import Setting, User
 
from kallithea.model.forms import LoginForm, PasswordResetConfirmationForm, PasswordResetRequestForm, RegisterForm
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LoginController(BaseController):
 

	
 
    def _validate_came_from(self, came_from,
 
@@ -59,25 +58,25 @@ class LoginController(BaseController):
 

	
 
        Determines if a URI reference is valid and relative to the origin;
 
        or in RFC 3986 terms, whether it matches this production:
 

	
 
          origin-relative-ref = path-absolute [ "?" query ] [ "#" fragment ]
 

	
 
        with the exception that '%' escapes are not validated and '#' is
 
        allowed inside the fragment part.
 
        """
 
        return _re.match(came_from) is not None
 

	
 
    def index(self):
 
        c.came_from = safe_str(request.GET.get('came_from', ''))
 
        c.came_from = request.GET.get('came_from', '')
 
        if c.came_from:
 
            if not self._validate_came_from(c.came_from):
 
                log.error('Invalid came_from (not server-relative): %r', c.came_from)
 
                raise HTTPBadRequest()
 
        else:
 
            c.came_from = url('home')
 

	
 
        if request.POST:
 
            # import Login Form validator class
 
            login_form = LoginForm()()
 
            try:
 
                c.form_result = login_form.to_python(dict(request.POST))
kallithea/controllers/pullrequests.py
Show inline comments
 
@@ -34,25 +34,25 @@ from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPForbidden, HTTPFound, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.controllers.changeset import _context_url, _ignorews_url, create_cs_pr_comment, delete_cs_pr_comment
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import ascii_bytes, safe_bytes, safe_int, safe_str
 
from kallithea.lib.utils2 import ascii_bytes, safe_bytes, safe_int
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, EmptyRepositoryError
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.db import ChangesetStatus, PullRequest, PullRequestReviewer, Repository, User
 
from kallithea.model.forms import PullRequestForm, PullRequestPostForm
 
from kallithea.model.meta import Session
 
from kallithea.model.pull_request import CreatePullRequestAction, CreatePullRequestIterationAction, PullRequestModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
@@ -73,30 +73,24 @@ def _get_reviewer(user_id):
 
class PullrequestsController(BaseRepoController):
 

	
 
    def _get_repo_refs(self, repo, rev=None, branch=None, branch_rev=None):
 
        """return a structure with repo's interesting changesets, suitable for
 
        the selectors in pullrequest.html
 

	
 
        rev: a revision that must be in the list somehow and selected by default
 
        branch: a branch that must be in the list and selected by default - even if closed
 
        branch_rev: a revision of which peers should be preferred and available."""
 
        # list named branches that has been merged to this named branch - it should probably merge back
 
        peers = []
 

	
 
        if rev:
 
            rev = safe_str(rev)
 

	
 
        if branch:
 
            branch = safe_str(branch)
 

	
 
        if branch_rev:
 
            # a revset not restricting to merge() would be better
 
            # (especially because it would get the branch point)
 
            # ... but is currently too expensive
 
            # including branches of children could be nice too
 
            peerbranches = set()
 
            for i in repo._repo.revs(
 
                b"sort(parents(branch(id(%s)) and merge()) - branch(id(%s)), -rev)",
 
                ascii_bytes(branch_rev), ascii_bytes(branch_rev),
 
            ):
 
                for abranch in repo.get_changeset(i).branches:
 
                    if abranch not in peerbranches:
 
@@ -577,25 +571,25 @@ class PullrequestsController(BaseRepoCon
 

	
 
        ignore_whitespace = request.GET.get('ignorews') == '1'
 
        line_context = safe_int(request.GET.get('context'), 3)
 
        c.ignorews_url = _ignorews_url
 
        c.context_url = _context_url
 
        fulldiff = request.GET.get('fulldiff')
 
        diff_limit = None if fulldiff else self.cut_off_limit
 

	
 
        # we swap org/other ref since we run a simple diff on one repo
 
        log.debug('running diff between %s and %s in %s',
 
                  c.a_rev, c.cs_rev, org_scm_instance.path)
 
        try:
 
            raw_diff = diffs.get_diff(org_scm_instance, rev1=safe_str(c.a_rev), rev2=safe_str(c.cs_rev),
 
            raw_diff = diffs.get_diff(org_scm_instance, rev1=c.a_rev, rev2=c.cs_rev,
 
                                      ignore_whitespace=ignore_whitespace, context=line_context)
 
        except ChangesetDoesNotExistError:
 
            raw_diff = safe_bytes(_("The diff can't be shown - the PR revisions could not be found."))
 
        diff_processor = diffs.DiffProcessor(raw_diff, diff_limit=diff_limit)
 
        c.limited_diff = diff_processor.limited_diff
 
        c.file_diff_data = []
 
        c.lines_added = 0
 
        c.lines_deleted = 0
 

	
 
        for f in diff_processor.parsed:
 
            st = f['stats']
 
            c.lines_added += st['added']
kallithea/controllers/search.py
Show inline comments
 
@@ -30,25 +30,25 @@ import traceback
 

	
 
from tg import config, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from whoosh.index import EmptyIndexError, exists_in, open_dir
 
from whoosh.qparser import QueryParser, QueryParserError
 
from whoosh.query import Phrase, Prefix
 

	
 
from kallithea.lib.auth import LoginRequired
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.indexers import CHGSET_IDX_NAME, CHGSETS_SCHEMA, IDX_NAME, SCHEMA, WhooshResultWrapper
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int, safe_str
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.model.repo import RepoModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SearchController(BaseRepoController):
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def index(self, repo_name=None):
 
        c.repo_name = repo_name
 
        c.formated_results = []
 
@@ -115,26 +115,26 @@ class SearchController(BaseRepoControlle
 
                    res_ln = len(results)
 
                    c.runtime = '%s results (%.3f seconds)' % (
 
                        res_ln, results.runtime
 
                    )
 

	
 
                    repo_location = RepoModel().repos_path
 
                    c.formated_results = Page(
 
                        WhooshResultWrapper(search_type, searcher, matcher,
 
                                            highlight_items, repo_location),
 
                        page=p,
 
                        item_count=res_ln,
 
                        items_per_page=10,
 
                        type=safe_str(c.cur_type),
 
                        q=safe_str(c.cur_query),
 
                        type=c.cur_type,
 
                        q=c.cur_query,
 
                    )
 

	
 
                except QueryParserError:
 
                    c.runtime = _('Invalid search query. Try quoting it.')
 
                searcher.close()
 
            except EmptyIndexError:
 
                log.error("Empty search index - run 'kallithea-cli index-create' regularly")
 
                c.runtime = _('The server has no search index.')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                c.runtime = _('An error occurred during search operation.')
 

	
kallithea/lib/auth_modules/auth_container.py
Show inline comments
 
@@ -20,25 +20,25 @@ Kallithea container based authentication
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import hybrid_property
 
from kallithea.lib.utils2 import safe_str, str2bool
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.model.db import Setting
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
@@ -171,25 +171,25 @@ class KallitheaAuthPlugin(auth_modules.K
 
        environ = kwargs.get('environ')
 
        if not environ:
 
            log.debug('Empty environ data skipping...')
 
            return None
 

	
 
        if not userobj:
 
            userobj = self.get_user('', environ=environ, settings=settings)
 

	
 
        # we don't care passed username/password for container auth plugins.
 
        # only way to log in is using environ
 
        username = None
 
        if userobj:
 
            username = safe_str(getattr(userobj, 'username'))
 
            username = getattr(userobj, 'username')
 

	
 
        if not username:
 
            # we don't have any objects in DB, user doesn't exist, extract
 
            # username from environ based on the settings
 
            username = self._get_username(environ, settings)
 

	
 
        # if cannot fetch username, it's a no-go for this plugin to proceed
 
        if not username:
 
            return None
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
kallithea/lib/auth_modules/auth_ldap.py
Show inline comments
 
@@ -22,25 +22,24 @@ Original author and date, and relevant c
 
:created_on: Created on Nov 17, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import hybrid_property
 
from kallithea.lib.exceptions import LdapConnectionError, LdapImportError, LdapPasswordError, LdapUsernameError
 
from kallithea.lib.utils2 import safe_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
try:
 
    import ldap
 
    import ldap.filter
 
except ImportError:
 
    # means that python-ldap is not installed
 
    ldap = None
 

	
 

	
 
@@ -61,29 +60,29 @@ class AuthLdap(object):
 
                                   OPT_X_TLS_DEMAND)
 
        self.cacertdir = cacertdir
 

	
 
        protocol = 'ldaps' if self.TLS_KIND == 'LDAPS' else 'ldap'
 
        if not port:
 
            port = 636 if self.TLS_KIND == 'LDAPS' else 389
 
        self.LDAP_SERVER = str(', '.join(
 
            "%s://%s:%s" % (protocol,
 
                            host.strip(),
 
                            port)
 
            for host in server.split(',')))
 

	
 
        self.LDAP_BIND_DN = safe_str(bind_dn)
 
        self.LDAP_BIND_PASS = safe_str(bind_pass)
 
        self.LDAP_BIND_DN = bind_dn
 
        self.LDAP_BIND_PASS = bind_pass
 

	
 
        self.BASE_DN = safe_str(base_dn)
 
        self.LDAP_FILTER = safe_str(ldap_filter)
 
        self.BASE_DN = base_dn
 
        self.LDAP_FILTER = ldap_filter
 
        self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope)
 
        self.attr_login = attr_login
 

	
 
    def authenticate_ldap(self, username, password):
 
        """
 
        Authenticate a user via LDAP and return his/her LDAP properties.
 

	
 
        Raises AuthenticationError if the credentials are rejected, or
 
        EnvironmentError if the LDAP server can't be reached.
 

	
 
        :param username: username
 
        :param password: password
 
@@ -130,25 +129,25 @@ class AuthLdap(object):
 
            lobjects = server.search_ext_s(self.BASE_DN, self.SEARCH_SCOPE,
 
                                           filter_)
 

	
 
            if not lobjects:
 
                raise ldap.NO_SUCH_OBJECT()
 

	
 
            for (dn, _attrs) in lobjects:
 
                if dn is None:
 
                    continue
 

	
 
                try:
 
                    log.debug('Trying simple bind with %s', dn)
 
                    server.simple_bind_s(dn, safe_str(password))
 
                    server.simple_bind_s(dn, password)
 
                    results = server.search_ext_s(dn, ldap.SCOPE_BASE,
 
                                                  '(objectClass=*)')
 
                    if len(results) == 1:
 
                        dn_, attrs = results[0]
 
                        assert dn_ == dn
 
                        return dn, attrs
 

	
 
                except ldap.INVALID_CREDENTIALS:
 
                    log.debug("LDAP rejected password for user '%s': %s",
 
                              username, dn)
 
                    continue # accept authentication as another ldap user with same username
 

	
kallithea/lib/base.py
Show inline comments
 
@@ -40,25 +40,25 @@ import paste.httpexceptions
 
import paste.httpheaders
 
import webob.exc
 
from tg import TGController, config, render_template, request, response, session
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
from kallithea import BACKENDS, __version__
 
from kallithea.config.routing import url
 
from kallithea.lib import auth_modules, ext_json
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyMiddleware
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import get_repo_slug, is_valid_repo
 
from kallithea.lib.utils2 import AttributeDict, ascii_bytes, safe_int, safe_str, safe_unicode, set_hook_environment, str2bool
 
from kallithea.lib.utils2 import AttributeDict, ascii_bytes, safe_int, safe_unicode, set_hook_environment, str2bool
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError
 
from kallithea.model import meta
 
from kallithea.model.db import PullRequest, Repository, Setting, User
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def render(template_path):
 
    return render_template({'url': url}, 'mako', template_path)
 

	
 
@@ -233,25 +233,25 @@ class BaseVCSController(object):
 
        # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
 
        #==============================================================
 

	
 
        # try to auth based on environ, container auth methods
 
        log.debug('Running PRE-AUTH for container based authentication')
 
        pre_auth = auth_modules.authenticate('', '', environ)
 
        if pre_auth is not None and pre_auth.get('username'):
 
            username = pre_auth['username']
 
        log.debug('PRE-AUTH got %s as username', username)
 

	
 
        # If not authenticated by the container, running basic auth
 
        if not username:
 
            self.authenticate.realm = safe_str(self.config['realm'])
 
            self.authenticate.realm = self.config['realm']
 
            result = self.authenticate(environ)
 
            if isinstance(result, str):
 
                paste.httpheaders.AUTH_TYPE.update(environ, 'basic')
 
                paste.httpheaders.REMOTE_USER.update(environ, result)
 
                username = result
 
            else:
 
                return None, result.wsgi_application
 

	
 
        #==============================================================
 
        # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME
 
        #==============================================================
 
        try:
 
@@ -324,25 +324,25 @@ class BaseVCSController(object):
 
            user, response_app = self._authorize(environ, parsed_request.action, parsed_request.repo_name, ip_addr)
 
            if response_app is not None:
 
                return response_app(environ, start_response)
 

	
 
            #======================================================================
 
            # REQUEST HANDLING
 
            #======================================================================
 
            set_hook_environment(user.username, ip_addr,
 
                parsed_request.repo_name, self.scm_alias, parsed_request.action)
 

	
 
            try:
 
                log.info('%s action on %s repo "%s" by "%s" from %s',
 
                         parsed_request.action, self.scm_alias, parsed_request.repo_name, safe_str(user.username), ip_addr)
 
                         parsed_request.action, self.scm_alias, parsed_request.repo_name, user.username, ip_addr)
 
                app = self._make_app(parsed_request)
 
                return app(environ, start_response)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise webob.exc.HTTPInternalServerError()
 

	
 
        except webob.exc.HTTPException as e:
 
            return e(environ, start_response)
 

	
 

	
 
class BaseController(TGController):
 

	
kallithea/lib/caching_query.py
Show inline comments
 
@@ -15,26 +15,24 @@ The three new concepts introduced here a
 
   a Query.
 

	
 
The rest of what's here are standard SQLAlchemy and
 
Beaker constructs.
 

	
 
"""
 
import beaker
 
from beaker.exceptions import BeakerException
 
from sqlalchemy.orm.interfaces import MapperOption
 
from sqlalchemy.orm.query import Query
 
from sqlalchemy.sql import visitors
 

	
 
from kallithea.lib.utils2 import safe_str
 

	
 

	
 
class CachingQuery(Query):
 
    """A Query subclass which optionally loads full results from a Beaker
 
    cache region.
 

	
 
    The CachingQuery stores additional state that allows it to consult
 
    a Beaker cache before accessing the database:
 

	
 
    * A "region", which is a cache region argument passed to a
 
      Beaker CacheManager, specifies a particular cache configuration
 
      (including backend implementation, expiration times, etc.)
 
    * A "namespace", which is a qualifying name that identifies a
 
@@ -166,25 +164,25 @@ def _namespace_from_query(namespace, que
 

	
 
    return namespace
 

	
 

	
 
def _set_cache_parameters(query, region, namespace, cache_key):
 

	
 
    if hasattr(query, '_cache_parameters'):
 
        region, namespace, cache_key = query._cache_parameters
 
        raise ValueError("This query is already configured "
 
                        "for region %r namespace %r" %
 
                        (region, namespace)
 
                    )
 
    query._cache_parameters = region, safe_str(namespace), cache_key
 
    query._cache_parameters = region, namespace, cache_key
 

	
 

	
 
class FromCache(MapperOption):
 
    """Specifies that a Query should load results from a cache."""
 

	
 
    propagate_to_loaders = False
 

	
 
    def __init__(self, region, namespace, cache_key=None):
 
        """Construct a new FromCache.
 

	
 
        :param region: the cache region.  Should be a
 
        region configured in the Beaker CacheManager.
kallithea/lib/hooks.py
Show inline comments
 
@@ -25,35 +25,35 @@ Original author and date, and relevant c
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import time
 

	
 
import mercurial.scmutil
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import action_logger, make_ui
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes, safe_str
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.model.db import Repository, User
 

	
 

	
 
def _get_scm_size(alias, root_path):
 
    if not alias.startswith('.'):
 
        alias += '.'
 

	
 
    size_scm, size_root = 0, 0
 
    for path, dirs, files in os.walk(safe_str(root_path)):
 
    for path, dirs, files in os.walk(root_path):
 
        if path.find(alias) != -1:
 
            for f in files:
 
                try:
 
                    size_scm += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 
        else:
 
            for f in files:
 
                try:
 
                    size_root += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 
@@ -309,26 +309,25 @@ def _hook_environment(repo_path):
 

	
 
    path_to_ini_file = extras['config']
 
    kallithea.CONFIG = paste.deploy.appconfig('config:' + path_to_ini_file)
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    kallithea.config.middleware.make_app(kallithea.CONFIG.global_conf, **kallithea.CONFIG.local_conf)
 

	
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database'
 
                      % (safe_str(repo_path)))
 
        raise OSError('Repository %s not found in database' % repo_path)
 

	
 
    baseui = make_ui()
 
    return baseui, repo
 

	
 

	
 
def handle_git_pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook"""
 
    # Currently unused. TODO: remove?
 
    return 0
 

	
 

	
 
def handle_git_post_receive(repo_path, git_stdin_lines):
 
@@ -388,14 +387,14 @@ def handle_git_post_receive(repo_path, g
 
        elif _type == 'tags':
 
            git_revs += ['tag=>%s' % push_ref['name']]
 

	
 
    process_pushed_raw_ids(git_revs)
 

	
 
    return 0
 

	
 

	
 
# Almost exactly like Mercurial contrib/hg-ssh:
 
def rejectpush(ui, **kwargs):
 
    """Mercurial hook to be installed as pretxnopen and prepushkey for read-only repos"""
 
    ex = get_hook_environment()
 
    ui.warn(safe_bytes("Push access to %r denied\n" % safe_str(ex.repository)))
 
    ui.warn(safe_bytes("Push access to %r denied\n" % ex.repository))
 
    return 1
kallithea/lib/indexers/daemon.py
Show inline comments
 
@@ -30,25 +30,25 @@ import logging
 
import os
 
import sys
 
import traceback
 
from os.path import dirname
 
from shutil import rmtree
 
from time import mktime
 

	
 
from whoosh.index import create_in, exists_in, open_dir
 
from whoosh.qparser import QueryParser
 

	
 
from kallithea.config.conf import INDEX_EXTENSIONS, INDEX_FILENAMES
 
from kallithea.lib.indexers import CHGSET_IDX_NAME, CHGSETS_SCHEMA, IDX_NAME, SCHEMA
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.lib.vcs.exceptions import ChangesetError, NodeDoesNotExistError, RepositoryError
 
from kallithea.model.db import Repository
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
# Add location of top level folder to sys.path
 
project_path = dirname(dirname(dirname(dirname(os.path.realpath(__file__)))))
 
sys.path.append(project_path)
 

	
 

	
 

	
 

	
 
@@ -123,46 +123,43 @@ class WhooshIndexingDaemon(object):
 
        return cs
 

	
 
    def get_paths(self, repo):
 
        """
 
        recursive walk in root dir and return a set of all path in that dir
 
        based on repository walk function
 
        """
 
        index_paths_ = set()
 
        try:
 
            cs = self._get_index_changeset(repo)
 
            for _topnode, _dirs, files in cs.walk('/'):
 
                for f in files:
 
                    index_paths_.add(os.path.join(safe_str(repo.path), safe_str(f.path)))
 
                    index_paths_.add(os.path.join(repo.path, f.path))
 

	
 
        except RepositoryError:
 
            log.debug(traceback.format_exc())
 
            pass
 
        return index_paths_
 

	
 
    def get_node(self, repo, path, index_rev=None):
 
        """
 
        gets a filenode based on given full path. It operates on string for
 
        hg git compatibility.
 
        gets a filenode based on given full path.
 

	
 
        :param repo: scm repo instance
 
        :param path: full path including root location
 
        :return: FileNode
 
        """
 
        # FIXME: paths should be normalized ... or even better: don't include repo.path
 
        path = safe_str(path)
 
        repo_path = safe_str(repo.path)
 
        assert path.startswith(repo_path)
 
        assert path[len(repo_path)] in (os.path.sep, os.path.altsep)
 
        node_path = path[len(repo_path) + 1:]
 
        assert path.startswith(repo.path)
 
        assert path[len(repo.path)] in (os.path.sep, os.path.altsep)
 
        node_path = path[len(repo.path) + 1:]
 
        cs = self._get_index_changeset(repo, index_rev=index_rev)
 
        node = cs.get_node(node_path)
 
        return node
 

	
 
    def is_indexable_node(self, node):
 
        """
 
        Just index the content of chosen files, skipping binary files
 
        """
 
        return (node.extension in INDEX_EXTENSIONS or node.name in INDEX_FILENAMES) and \
 
               not node.is_binary
 

	
 
    def get_node_mtime(self, node):
kallithea/lib/middleware/simplehg.py
Show inline comments
 
@@ -27,25 +27,25 @@ Original author and date, and relevant c
 

	
 
"""
 

	
 

	
 
import logging
 
import os
 
import urllib.parse
 

	
 
import mercurial.hgweb
 

	
 
from kallithea.lib.base import BaseVCSController, get_path_info
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.utils2 import safe_bytes, safe_str
 
from kallithea.lib.utils2 import safe_bytes
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def get_header_hgarg(environ):
 
    """Decode the special Mercurial encoding of big requests over multiple headers.
 
    >>> get_header_hgarg({})
 
    ''
 
    >>> get_header_hgarg({'HTTP_X_HGARG_0': ' ', 'HTTP_X_HGARG_1': 'a','HTTP_X_HGARG_2': '','HTTP_X_HGARG_3': 'b+c %20'})
 
    'ab+c %20'
 
    """
 
@@ -128,22 +128,22 @@ class SimpleHg(BaseVCSController):
 
                                action = 'push'
 
                                break
 
                    else:
 
                        action = cmd_mapping.get(cmd, 'push')
 
                    break # only process one cmd
 

	
 
        return parsed_request
 

	
 
    def _make_app(self, parsed_request):
 
        """
 
        Make an hgweb wsgi application.
 
        """
 
        str_repo_name = safe_str(parsed_request.repo_name)
 
        repo_path = os.path.join(safe_str(self.basepath), str_repo_name)
 
        repo_name = parsed_request.repo_name
 
        repo_path = os.path.join(self.basepath, repo_name)
 
        baseui = make_ui(repo_path=repo_path)
 
        hgweb_app = mercurial.hgweb.hgweb(safe_bytes(repo_path), name=str_repo_name, baseui=baseui)
 
        hgweb_app = mercurial.hgweb.hgweb(safe_bytes(repo_path), name=safe_bytes(repo_name), baseui=baseui)
 

	
 
        def wrapper_app(environ, start_response):
 
            environ['REPO_NAME'] = str_repo_name # used by mercurial.hgweb.hgweb
 
            environ['REPO_NAME'] = repo_name # used by mercurial.hgweb.hgweb
 
            return hgweb_app(environ, start_response)
 

	
 
        return wrapper_app
kallithea/lib/utils.py
Show inline comments
 
@@ -31,25 +31,25 @@ import os
 
import re
 
import sys
 
import traceback
 
from distutils.version import StrictVersion
 

	
 
import beaker.cache
 
import mercurial.config
 
import mercurial.ui
 
from tg.i18n import ugettext as _
 

	
 
import kallithea.config.conf
 
from kallithea.lib.exceptions import HgsubversionImportError
 
from kallithea.lib.utils2 import ascii_bytes, aslist, get_current_authuser, safe_bytes, safe_str
 
from kallithea.lib.utils2 import ascii_bytes, aslist, get_current_authuser, safe_bytes
 
from kallithea.lib.vcs.backends.git.repository import GitRepository
 
from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils.fakemod import create_module
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import meta
 
from kallithea.model.db import RepoGroup, Repository, Setting, Ui, User, UserGroup, UserLog
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
@@ -165,25 +165,25 @@ def action_logger(user, action, repo, ip
 
        meta.Session().commit()
 

	
 

	
 
def get_filesystem_repos(path):
 
    """
 
    Scans given path for repos and return (name,(type,path)) tuple
 

	
 
    :param path: path to scan for repositories
 
    :param recursive: recursive search and return names with subdirs in front
 
    """
 

	
 
    # remove ending slash for better results
 
    path = safe_str(path.rstrip(os.sep))
 
    path = path.rstrip(os.sep)
 
    log.debug('now scanning in %s', path)
 

	
 
    def isdir(*n):
 
        return os.path.isdir(os.path.join(*n))
 

	
 
    for root, dirs, _files in os.walk(path):
 
        recurse_dirs = []
 
        for subdir in dirs:
 
            # skip removed repos
 
            if REMOVED_REPO_PAT.match(subdir):
 
                continue
 

	
 
@@ -260,43 +260,43 @@ def is_valid_repo(repo_name, base_path, 
 
    """
 
    Returns True if given path is a valid repository False otherwise.
 
    If scm param is given also compare if given scm is the same as expected
 
    from scm parameter
 

	
 
    :param repo_name:
 
    :param base_path:
 
    :param scm:
 

	
 
    :return True: if given path is a valid repository
 
    """
 
    # TODO: paranoid security checks?
 
    full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
 
    full_path = os.path.join(base_path, repo_name)
 

	
 
    try:
 
        scm_ = get_scm(full_path)
 
        if scm:
 
            return scm_[0] == scm
 
        return True
 
    except VCSError:
 
        return False
 

	
 

	
 
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
 
    """
 
    Returns True if given path is a repository group False otherwise
 

	
 
    :param repo_name:
 
    :param base_path:
 
    """
 
    full_path = os.path.join(safe_str(base_path), safe_str(repo_group_name))
 
    full_path = os.path.join(base_path, repo_group_name)
 

	
 
    # check if it's not a repo
 
    if is_valid_repo(repo_group_name, base_path):
 
        return False
 

	
 
    try:
 
        # we need to check bare git repos at higher level
 
        # since we might match branches/hooks/info/objects or possible
 
        # other things inside bare git repo
 
        get_scm(os.path.dirname(full_path))
 
        return False
 
    except VCSError:
kallithea/lib/utils2.py
Show inline comments
 
@@ -320,25 +320,25 @@ def credentials_filter(uri):
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
 
    parsed_url = urlobject.URLObject(prefix_url)
 
    prefix = urllib.parse.unquote(parsed_url.path.rstrip('/'))
 
    try:
 
        system_user = pwd.getpwuid(os.getuid()).pw_name
 
    except Exception: # TODO: support all systems - especially Windows
 
        system_user = 'kallithea' # hardcoded default value ...
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': urllib.parse.quote(safe_str(username or '')),
 
        'user': urllib.parse.quote(username or ''),
 
        'netloc': parsed_url.netloc + prefix,  # like "hostname:port/prefix" (with optional ":port" and "/prefix")
 
        'prefix': prefix, # undocumented, empty or starting with /
 
        'repo': repo_name,
 
        'repoid': str(repo_id),
 
        'system_user': system_user,
 
        'hostname': parsed_url.hostname,
 
    }
 
    url = re.sub('{([^{}]+)}', lambda m: args.get(m.group(1), m.group(0)), clone_uri_tmpl)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(url)
 
    if not url_obj.username:
 
@@ -552,25 +552,25 @@ class Optional(object):
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
 
    return _cleanstringsub('_', s).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
kallithea/lib/vcs/backends/git/changeset.py
Show inline comments
 
@@ -2,37 +2,36 @@ import re
 
from io import BytesIO
 
from itertools import chain
 
from subprocess import PIPE, Popen
 

	
 
from dulwich import objects
 
from dulwich.config import ConfigFile
 

	
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, ImproperArchiveTypeError, NodeDoesNotExistError, RepositoryError, VCSError
 
from kallithea.lib.vcs.nodes import (
 
    AddedFileNodesGenerator, ChangedFileNodesGenerator, DirNode, FileNode, NodeKind, RemovedFileNodesGenerator, RootNode, SubModuleNode)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, safe_int, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, safe_int, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
class GitChangeset(BaseChangeset):
 
    """
 
    Represents state of the repository at a revision.
 
    """
 

	
 
    def __init__(self, repository, revision):
 
        self._stat_modes = {}
 
        self.repository = repository
 
        revision = safe_str(revision)
 
        try:
 
            commit = self.repository._repo[ascii_bytes(revision)]
 
            if isinstance(commit, objects.Tag):
 
                revision = safe_str(commit.object[1])
 
                commit = self.repository._repo.get_object(commit.object[1])
 
        except KeyError:
 
            raise RepositoryError("Cannot get object with id %s" % revision)
 
        self.raw_id = ascii_str(commit.id)
 
        self.short_id = self.raw_id[:12]
 
        self._commit = commit  # a Dulwich Commmit with .id
 
        self._tree_id = commit.tree
 
        self._committer_property = 'committer'
 
@@ -100,25 +99,24 @@ class GitChangeset(BaseChangeset):
 
        return [b for b in heads if heads[b] == self._commit.id] # FIXME: Inefficient ... and returning None!
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed.
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 
        return path
 

	
 
    def _get_id_for_path(self, path):
 
        path = safe_str(path)
 
        # FIXME: Please, spare a couple of minutes and make those codes cleaner;
 
        if path not in self._paths:
 
            path = path.strip('/')
 
            # set root tree
 
            tree = self.repository._repo[self._tree_id]
 
            if path == '':
 
                self._paths[''] = tree.id
 
                return tree.id
 
            splitted = path.split('/')
 
            dirs, name = splitted[:-1], splitted[-1]
 
            curdir = ''
 

	
 
@@ -150,25 +148,25 @@ class GitChangeset(BaseChangeset):
 

	
 
                # cache all items from the given traversed tree
 
                for item, stat, id in tree.items():
 
                    if curdir:
 
                        name = '/'.join((curdir, item))
 
                    else:
 
                        name = item
 
                    self._paths[name] = id
 
                    self._stat_modes[name] = stat
 
            if path not in self._paths:
 
                raise NodeDoesNotExistError("There is no file nor directory "
 
                    "at the given path '%s' at revision %s"
 
                    % (path, safe_str(self.short_id)))
 
                    % (path, self.short_id))
 
        return self._paths[path]
 

	
 
    def _get_kind(self, path):
 
        obj = self.repository._repo[self._get_id_for_path(path)]
 
        if isinstance(obj, objects.Blob):
 
            return NodeKind.FILE
 
        elif isinstance(obj, objects.Tree):
 
            return NodeKind.DIR
 

	
 
    def _get_filectx(self, path):
 
        path = self._fix_path(path)
 
        if self._get_kind(path) != NodeKind.FILE:
 
@@ -243,25 +241,24 @@ class GitChangeset(BaseChangeset):
 
        # Only used to feed diffstat
 
        rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
 
        rev2 = self
 
        return b''.join(self.repository.get_diff(rev1, rev2,
 
                                    ignore_whitespace=ignore_whitespace,
 
                                    context=context))
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        # ensure path is traversed
 
        path = safe_str(path)
 
        self._get_id_for_path(path)
 
        return self._stat_modes[path]
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.as_pretty_string()
 

	
 
    def get_file_size(self, path):
 
@@ -279,33 +276,32 @@ class GitChangeset(BaseChangeset):
 
        return self.get_file_history(path, limit=1)[0]
 

	
 
    def get_file_history(self, path, limit=None):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        TODO: This function now uses os underlying 'git' and 'grep' commands
 
        which is generally not good. Should be replaced with algorithm
 
        iterating commits.
 
        """
 
        self._get_filectx(path)
 
        f_path = safe_str(path)
 

	
 
        if limit is not None:
 
            cmd = ['log', '-n', str(safe_int(limit, 0)),
 
                   '--pretty=format:%H', '-s', self.raw_id, '--', f_path]
 
                   '--pretty=format:%H', '-s', self.raw_id, '--', path]
 

	
 
        else:
 
            cmd = ['log',
 
                   '--pretty=format:%H', '-s', self.raw_id, '--', f_path]
 
                   '--pretty=format:%H', '-s', self.raw_id, '--', path]
 
        so = self.repository.run_git_command(cmd)
 
        ids = re.findall(r'[0-9a-fA-F]{40}', so)
 
        return [self.repository.get_changeset(sha) for sha in ids]
 

	
 
    def get_file_history_2(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        """
 
        self._get_filectx(path)
 
        from dulwich.walk import Walker
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -21,25 +21,25 @@ from collections import OrderedDict
 

	
 
import mercurial.url  # import httpbasicauthhandler, httpdigestauthhandler
 
import mercurial.util  # import url as hg_url
 
from dulwich.config import ConfigFile
 
from dulwich.objects import Tag
 
from dulwich.repo import NotGitRepository, Repo
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError, TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils import ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from .changeset import GitChangeset
 
from .inmemory import GitInMemoryChangeset
 
from .workdir import GitWorkdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 
@@ -308,25 +308,24 @@ class GitRepository(BaseRepository):
 
        return self._get_revision(ref_name)
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
            yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        url = safe_str(url)
 
        if url != 'default' and '://' not in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
kallithea/lib/vcs/backends/git/ssh.py
Show inline comments
 
@@ -9,25 +9,24 @@
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import os
 

	
 
from kallithea.lib.hooks import log_pull_action
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitSshHandler(BaseSshHandler):
 
    vcs_type = 'git'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 
@@ -61,23 +60,23 @@ class GitSshHandler(BaseSshHandler):
 
        return None
 

	
 
    def __init__(self, repo_name, verb):
 
        BaseSshHandler.__init__(self, repo_name)
 
        self.verb = verb
 

	
 
    def _serve(self):
 
        if self.verb == 'git-upload-pack': # action 'pull'
 
            # base class called set_hook_environment - action is hardcoded to 'pull'
 
            log_pull_action(ui=make_ui(), repo=self.db_repo.scm_instance._repo)
 
        else: # probably verb 'git-receive-pack', action 'push'
 
            if not self.allow_push:
 
                self.exit('Push access to %r denied' % safe_str(self.repo_name))
 
                self.exit('Push access to %r denied' % self.repo_name)
 
            # Note: push logging is handled by Git post-receive hook
 

	
 
        # git shell is not a real shell but use shell inspired quoting *inside* the argument.
 
        # Per https://github.com/git/git/blob/v2.22.0/quote.c#L12 :
 
        # The path must be "'" quoted, but "'" and "!" must exit the quoting and be "\" escaped
 
        quoted_abspath = "'%s'" % self.db_repo.repo_full_path.replace("'", r"'\''").replace("!", r"'\!'")
 
        newcmd = ['git', 'shell', '-c', "%s %s" % (self.verb, quoted_abspath)]
 
        log.debug('Serving: %s', newcmd)
 
        os.execvp(newcmd[0], newcmd)
 
        self.exit("Failed to exec 'git' as %s" % newcmd)
kallithea/lib/vcs/backends/hg/changeset.py
Show inline comments
 
import os
 
import posixpath
 

	
 
import mercurial.archival
 
import mercurial.node
 
import mercurial.obsutil
 

	
 
from kallithea.lib.vcs.backends.base import BaseChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, ImproperArchiveTypeError, NodeDoesNotExistError, VCSError
 
from kallithea.lib.vcs.nodes import (
 
    AddedFileNodesGenerator, ChangedFileNodesGenerator, DirNode, FileNode, NodeKind, RemovedFileNodesGenerator, RootNode, SubModuleNode)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, safe_bytes, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, safe_bytes, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import get_dirs_for_path
 

	
 

	
 
class MercurialChangeset(BaseChangeset):
 
    """
 
    Represents state of the repository at a revision.
 
    """
 

	
 
    def __init__(self, repository, revision):
 
        self.repository = repository
 
        assert isinstance(revision, str), repr(revision)
 
@@ -193,25 +193,25 @@ class MercurialChangeset(BaseChangeset):
 
        # Only used to feed diffstat
 
        return b''.join(self._ctx.diff())
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed. Also mercurial keeps filenodes as str so we need to decode
 
        from unicode to str
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 

	
 
        return safe_str(path)
 
        return path
 

	
 
    def _get_kind(self, path):
 
        path = self._fix_path(path)
 
        if path in self._file_paths:
 
            return NodeKind.FILE
 
        elif path in self._dir_paths:
 
            return NodeKind.DIR
 
        else:
 
            raise ChangesetError("Node does not exist at the given path '%s'"
 
                % (path))
 

	
 
    def _get_filectx(self, path):
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -30,25 +30,25 @@ import mercurial.mdiff
 
import mercurial.node
 
import mercurial.patch
 
import mercurial.scmutil
 
import mercurial.sshpeer
 
import mercurial.tags
 
import mercurial.ui
 
import mercurial.url
 
import mercurial.util
 

	
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError, TagDoesNotExistError, VCSError)
 
from kallithea.lib.vcs.utils import ascii_str, author_email, author_name, date_fromtimestamp, makedate, safe_bytes, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils import ascii_str, author_email, author_name, date_fromtimestamp, makedate, safe_bytes, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 
from .changeset import MercurialChangeset
 
from .inmemory import MercurialInMemoryChangeset
 
from .workdir import MercurialWorkdir
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialRepository(BaseRepository):
 
@@ -437,25 +437,24 @@ class MercurialRepository(BaseRepository
 
            return ascii_str(mercurial.scmutil.revsymbol(self._repo, revision).hex())
 
        except (IndexError, ValueError, mercurial.error.RepoLookupError, TypeError):
 
            msg = "Revision %r does not exist for %s" % (safe_unicode(revision), self.name)
 
            raise ChangesetDoesNotExistError(msg)
 
        except (LookupError, ):
 
            msg = "Ambiguous identifier `%s` for %s" % (safe_unicode(revision), self.name)
 
            raise ChangesetDoesNotExistError(msg)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns revision number for the given reference.
 
        """
 
        ref_name = safe_str(ref_name)
 
        if ref_type == 'rev' and not ref_name.strip('0'):
 
            return self.EMPTY_CHANGESET
 
        # lookup up the exact node id
 
        _revset_predicates = {
 
                'branch': 'branch',
 
                'book': 'bookmark',
 
                'tag': 'tag',
 
                'rev': 'id',
 
            }
 
        # avoid expensive branch(x) iteration over whole repo
 
        rev_spec = "%%s & %s(%%s)" % _revset_predicates[ref_type]
 
        try:
 
@@ -476,29 +475,27 @@ class MercurialRepository(BaseRepository
 

	
 
    def _get_archives(self, archive_name='tip'):
 
        allowed = self.baseui.configlist(b"web", b"allow_archive",
 
                                         untrusted=True)
 
        for name, ext in [(b'zip', '.zip'), (b'gz', '.tar.gz'), (b'bz2', '.tar.bz2')]:
 
            if name in allowed or self._repo.ui.configbool(b"web",
 
                                                           b"allow" + name,
 
                                                           untrusted=True):
 
                yield {"type": name, "extension": ext, "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall
 
        to filesystem
 
        (``file:///``) schema.
 
        Returns normalized url. If schema is not given, fall back to
 
        filesystem (``file:///``) schema.
 
        """
 
        url = safe_str(url)
 
        if url != 'default' and '://' not in url:
 
            url = "file:" + urllib.request.pathname2url(url)
 
        return url
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return MercurialChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
kallithea/lib/vcs/backends/ssh.py
Show inline comments
 
@@ -16,25 +16,24 @@
 
vcs.backends.ssh
 
~~~~~~~~~~~~~~~~~
 

	
 
SSH backend for all available SCMs
 
"""
 

	
 
import datetime
 
import logging
 
import sys
 

	
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyMiddleware
 
from kallithea.lib.utils2 import set_hook_environment
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.model.db import Repository, User, UserSshKeys
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class BaseSshHandler(object):
 
    # Protocol for setting properties:
 
    # Set by sub class:
 
    #   vcs_type: 'hg' or 'git'
 
    # Set by make() / __init__():
 
@@ -74,25 +73,25 @@ class BaseSshHandler(object):
 
        ssh_key = UserSshKeys.get(key_id)
 
        if ssh_key is None:
 
            self.exit('SSH key %r not found' % key_id)
 
        ssh_key.last_seen = datetime.datetime.now()
 
        Session().commit()
 

	
 
        if HasPermissionAnyMiddleware('repository.write',
 
                                      'repository.admin')(self.authuser, self.repo_name):
 
            self.allow_push = True
 
        elif HasPermissionAnyMiddleware('repository.read')(self.authuser, self.repo_name):
 
            self.allow_push = False
 
        else:
 
            self.exit('Access to %r denied' % safe_str(self.repo_name))
 
            self.exit('Access to %r denied' % self.repo_name)
 

	
 
        self.db_repo = Repository.get_by_repo_name(self.repo_name)
 
        if self.db_repo is None:
 
            self.exit("Repository '%s' not found" % self.repo_name)
 
        assert self.db_repo.repo_name == self.repo_name
 

	
 
        # Set global hook environment up for 'push' actions.
 
        # If pull actions should be served, the actual hook invocation will be
 
        # hardcoded to 'pull' when log_pull_action is invoked (directly on Git,
 
        # or through the Mercurial 'outgoing' hook).
 
        # For push actions, the action in global hook environment is used (in
 
        # handle_git_post_receive when it is called as Git post-receive hook,
kallithea/lib/vcs/nodes.py
Show inline comments
 
@@ -7,25 +7,25 @@
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import functools
 
import mimetypes
 
import posixpath
 
import stat
 

	
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import NodeError, RemovedFileNodeError
 
from kallithea.lib.vcs.utils import safe_bytes, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils import safe_bytes, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
class NodeKind:
 
    SUBMODULE = -1
 
    DIR = 1
 
    FILE = 2
 

	
 

	
 
class NodeState:
 
    ADDED = u'added'
 
    CHANGED = u'changed'
 
@@ -93,25 +93,25 @@ class Node(object):
 
    should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
 
    directly.
 

	
 
    Node's ``path`` cannot start with slash as we operate on *relative* paths
 
    only. Moreover, every single node is identified by the ``path`` attribute,
 
    so it cannot end with slash, too. Otherwise, path could lead to mistakes.
 
    """
 

	
 
    def __init__(self, path, kind):
 
        if path.startswith('/'):
 
            raise NodeError("Cannot initialize Node objects with slash at "
 
                            "the beginning as only relative paths are supported")
 
        self.path = safe_str(path.rstrip('/'))  # we store paths as str
 
        self.path = path.rstrip('/')
 
        if path == '' and kind != NodeKind.DIR:
 
            raise NodeError("Only DirNode and its subclasses may be "
 
                            "initialized with empty path")
 
        self.kind = kind
 
        #self.dirs, self.files = [], []
 
        if self.is_root() and not self.is_dir():
 
            raise NodeError("Root node cannot be FILE kind")
 

	
 
    @LazyProperty
 
    def parent(self):
 
        parent_path = self.get_parent_path()
 
        if parent_path:
 
@@ -583,25 +583,25 @@ class SubModuleNode(Node):
 
    represents a SubModule of Git or SubRepo of Mercurial
 
    """
 
    is_binary = False
 
    size = 0
 

	
 
    def __init__(self, name, url, changeset=None, alias=None):
 
        # Note: Doesn't call Node.__init__!
 
        self.path = name
 
        self.kind = NodeKind.SUBMODULE
 
        self.alias = alias
 
        # we have to use emptyChangeset here since this can point to svn/git/hg
 
        # submodules we cannot get from repository
 
        self.changeset = EmptyChangeset(str(changeset), alias=alias)
 
        self.changeset = EmptyChangeset(changeset, alias=alias)
 
        self.url = url
 

	
 
    def __repr__(self):
 
        return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
 
                                 getattr(self.changeset, 'short_id', ''))
 

	
 
    @LazyProperty
 
    def name(self):
 
        """
 
        Returns name of the node so if its path
 
        then only last part is returned.
 
        """
kallithea/lib/vcs/utils/__init__.py
Show inline comments
 
@@ -195,25 +195,25 @@ def author_email(author):
 
    if not author:
 
        return ''
 

	
 
    l = author.find('<') + 1
 
    if l != 0:
 
        r = author.find('>', l)
 
        if r != -1:
 
            author = author[l:r]
 

	
 
    m = email_re.search(author)
 
    if m is None:
 
        return ''
 
    return safe_str(m.group(0))
 
    return m.group(0)
 

	
 

	
 
def author_name(author):
 
    """
 
    get name of author, or else username.
 
    It'll try to find an email in the author string and just cut it off
 
    to get the username
 
    """
 
    if not author:
 
        return ''
 
    if '@' not in author:
 
        return author
kallithea/model/db.py
Show inline comments
 
@@ -40,25 +40,25 @@ import sqlalchemy
 
from beaker.cache import cache_region, region_invalidate
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (
 
    Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, safe_unicode, str2bool, urlreadable)
 
    Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_unicode, str2bool, urlreadable)
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.meta import Base, Session
 

	
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
@@ -1415,25 +1415,25 @@ class Repository(Base, BaseDbModel):
 
            return self.scm_instance_no_cache()
 
        rn = self.repo_name
 

	
 
        valid = CacheInvalidation.test_and_set_valid(rn, None, valid_cache_keys=valid_cache_keys)
 
        if not valid:
 
            log.debug('Cache for %s invalidated, getting new object', rn)
 
            region_invalidate(_c, None, 'scm_instance_cached', rn)
 
        else:
 
            log.debug('Trying to get scm_instance of %s from cache', rn)
 
        return _c(rn)
 

	
 
    def scm_instance_no_cache(self):
 
        repo_full_path = safe_str(self.repo_full_path)
 
        repo_full_path = self.repo_full_path
 
        alias = get_scm(repo_full_path)[0]
 
        log.debug('Creating instance of %s repository from %s',
 
                  alias, self.repo_full_path)
 
        backend = get_backend(alias)
 

	
 
        if alias == 'hg':
 
            repo = backend(repo_full_path, create=False,
 
                           baseui=self._ui)
 
        else:
 
            repo = backend(repo_full_path, create=False)
 

	
 
        return repo
 
@@ -2082,29 +2082,29 @@ class CacheInvalidation(Base, BaseDbMode
 
        key must / will start with a repo_name which will be stored in .cache_args .
 
        """
 
        prefix = kallithea.CONFIG.get('instance_id', '')
 
        return "%s%s" % (prefix, key)
 

	
 
    @classmethod
 
    def set_invalidate(cls, repo_name):
 
        """
 
        Mark all caches of a repo as invalid in the database.
 
        """
 
        inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
 
        log.debug('for repo %s got %s invalidation objects',
 
                  safe_str(repo_name), inv_objs)
 
                  repo_name, inv_objs)
 

	
 
        for inv_obj in inv_objs:
 
            log.debug('marking %s key for invalidation based on repo_name=%s',
 
                      inv_obj, safe_str(repo_name))
 
                      inv_obj, repo_name)
 
            Session().delete(inv_obj)
 
        Session().commit()
 

	
 
    @classmethod
 
    def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
 
        """
 
        Mark this cache key as active and currently cached.
 
        Return True if the existing cache registration still was valid.
 
        Return False to indicate that it had been invalidated and caches should be refreshed.
 
        """
 

	
 
        key = (repo_name + '_' + kind) if kind else repo_name
 
@@ -2508,25 +2508,25 @@ class Gist(Base, BaseDbModel):
 

	
 
    def __json__(self):
 
        data = dict(
 
        )
 
        data.update(self.get_api_data())
 
        return data
 
    ## SCM functions
 

	
 
    @property
 
    def scm_instance(self):
 
        from kallithea.lib.vcs import get_repo
 
        base_path = self.base_path()
 
        return get_repo(os.path.join(safe_str(base_path), safe_str(self.gist_access_id)))
 
        return get_repo(os.path.join(base_path, self.gist_access_id))
 

	
 

	
 
class UserSshKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_ssh_keys'
 
    __table_args__ = (
 
        Index('usk_fingerprint_idx', 'fingerprint'),
 
        UniqueConstraint('fingerprint'),
 
        _table_args_default_dict
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_ssh_key_id = Column(Integer(), primary_key=True)
kallithea/model/repo.py
Show inline comments
 
@@ -30,25 +30,25 @@ import logging
 
import os
 
import shutil
 
import traceback
 
from datetime import datetime
 

	
 
import kallithea.lib.utils2
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionLevel
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.lib.hooks import log_delete_repository
 
from kallithea.lib.utils import is_valid_repo_uri, make_ui
 
from kallithea.lib.utils2 import LazyProperty, get_current_authuser, obfuscate_url_pw, remove_prefix, safe_str
 
from kallithea.lib.utils2 import LazyProperty, get_current_authuser, obfuscate_url_pw, remove_prefix
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.model.db import (
 
    Permission, RepoGroup, Repository, RepositoryField, Session, Statistics, Ui, User, UserGroup, UserGroupRepoGroupToPerm, UserGroupRepoToPerm, UserRepoGroupToPerm, UserRepoToPerm)
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(object):
 

	
 
    URL_SEPARATOR = Repository.url_sep()
 

	
 
@@ -632,26 +632,25 @@ class RepoModel(object):
 
        if '/' in repo_name:
 
            raise ValueError('repo_name must not contain groups got `%s`' % repo_name)
 

	
 
        if isinstance(repo_group, RepoGroup):
 
            new_parent_path = os.sep.join(repo_group.full_path_splitted)
 
        else:
 
            new_parent_path = repo_group or ''
 

	
 
        if repo_store_location:
 
            _paths = [repo_store_location]
 
        else:
 
            _paths = [self.repos_path, new_parent_path, repo_name]
 
            # we need to make it str for mercurial
 
        repo_path = os.path.join(*(safe_str(x) for x in _paths))
 
        repo_path = os.path.join(*_paths)
 

	
 
        # check if this path is not a repository
 
        if is_valid_repo(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid repository' % repo_path)
 

	
 
        # check if this path is a group
 
        if is_valid_repo_group(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid group' % repo_path)
 

	
 
        log.info('creating repo %s in %s from url: `%s`',
 
            repo_name, repo_path,
 
            obfuscate_url_pw(clone_uri))
 
@@ -677,42 +676,42 @@ class RepoModel(object):
 
                  repo_name, repo_type)
 
        return repo
 

	
 
    def _rename_filesystem_repo(self, old, new):
 
        """
 
        renames repository on filesystem
 

	
 
        :param old: old name
 
        :param new: new name
 
        """
 
        log.info('renaming repo from %s to %s', old, new)
 

	
 
        old_path = safe_str(os.path.join(self.repos_path, old))
 
        new_path = safe_str(os.path.join(self.repos_path, new))
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 
        if os.path.isdir(new_path):
 
            raise Exception(
 
                'Was trying to rename to already existing dir %s' % new_path
 
            )
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_filesystem_repo(self, repo):
 
        """
 
        removes repo from filesystem, the removal is actually done by
 
        renaming dir to a 'rm__*' prefix which Kallithea will skip.
 
        It can be undeleted later by reverting the rename.
 

	
 
        :param repo: repo object
 
        """
 
        rm_path = safe_str(os.path.join(self.repos_path, repo.repo_name))
 
        rm_path = os.path.join(self.repos_path, repo.repo_name)
 
        log.info("Removing %s", rm_path)
 

	
 
        _now = datetime.now()
 
        _ms = str(_now.microsecond).rjust(6, '0')
 
        _d = 'rm__%s__%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                             repo.just_name)
 
        if repo.group:
 
            args = repo.group.full_path_splitted + [_d]
 
            _d = os.path.join(*args)
 
        if os.path.exists(rm_path):
 
            shutil.move(rm_path, safe_str(os.path.join(self.repos_path, _d)))
 
            shutil.move(rm_path, os.path.join(self.repos_path, _d))
 
        else:
 
            log.error("Can't find repo to delete in %r", rm_path)
kallithea/model/scm.py
Show inline comments
 
@@ -32,25 +32,25 @@ import re
 
import sys
 
import traceback
 

	
 
import pkg_resources
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea import BACKENDS
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel, HasUserGroupPermissionLevel
 
from kallithea.lib.exceptions import IMCCommitError, NonRelativePathError
 
from kallithea.lib.hooks import process_pushed_raw_ids
 
from kallithea.lib.utils import action_logger, get_filesystem_repos, make_ui
 
from kallithea.lib.utils2 import safe_bytes, safe_str, set_hook_environment
 
from kallithea.lib.utils2 import safe_bytes, set_hook_environment
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.db import PullRequest, RepoGroup, Repository, Session, Ui, User, UserFollowing, UserLog
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
@@ -181,25 +181,25 @@ class ScmModel(object):
 
            # since this is internal storage separator for kallithea
 
            name = Repository.normalize_repo_name(name)
 

	
 
            try:
 
                if name in repos:
 
                    raise RepositoryError('Duplicate repository name %s '
 
                                          'found in %s' % (name, path))
 
                else:
 

	
 
                    klass = get_backend(path[0])
 

	
 
                    if path[0] == 'hg' and path[0] in BACKENDS:
 
                        repos[name] = klass(safe_str(path[1]), baseui=baseui)
 
                        repos[name] = klass(path[1], baseui=baseui)
 

	
 
                    if path[0] == 'git' and path[0] in BACKENDS:
 
                        repos[name] = klass(path[1])
 
            except OSError:
 
                continue
 
        log.debug('found %s paths with repositories', len(repos))
 
        return repos
 

	
 
    def get_repos(self, repos):
 
        """Return the repos the user has access to"""
 
        return RepoList(repos, perm_level='read')
 

	
 
@@ -387,31 +387,26 @@ class ScmModel(object):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def commit_change(self, repo, repo_name, cs, user, ip_addr, author, message,
 
                      content, f_path):
 
        """
 
        Commit a change to a single file
 

	
 
        :param repo: a db_repo.scm_instance
 
        """
 
        user = User.guess_instance(user)
 
        IMC = self._get_IMC_module(repo.alias)
 

	
 
        # decoding here will force that we have proper encoded values
 
        # in any other case this will throw exceptions and deny commit
 
        content = safe_str(content)
 
        path = safe_str(f_path)
 
        imc = IMC(repo)
 
        imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
 
        imc.change(FileNode(f_path, content, mode=cs.get_file_mode(f_path)))
 
        try:
 
            tip = imc.commit(message=message, author=author,
 
                             parents=[cs], branch=cs.branch)
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            # clear caches - we also want a fresh object if commit fails
 
            self.mark_for_invalidation(repo_name)
 
            raise IMCCommitError(str(e))
 
        self._handle_push(repo,
 
                          username=user.username,
 
                          ip_addr=ip_addr,
 
                          action='push_local',
 
@@ -469,30 +464,25 @@ class ScmModel(object):
 
        :param trigger_push_hook: trigger push hooks
 

	
 
        :returns: new committed changeset
 
        """
 

	
 
        user = User.guess_instance(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        processed_nodes = []
 
        for f_path in nodes:
 
            content = nodes[f_path]['content']
 
            f_path = self._sanitize_path(f_path)
 
            f_path = safe_str(f_path)
 
            # decoding here will force that we have proper encoded values
 
            # in any other case this will throw exceptions and deny commit
 
            if isinstance(content, (str,)):
 
                content = safe_str(content)
 
            else:
 
            if not isinstance(content, str) and not isinstance(content, bytes):
 
                content = content.read()
 
            processed_nodes.append((f_path, content))
 

	
 
        message = message
 
        committer = user.full_contact
 
        if not author:
 
            author = committer
 

	
 
        IMC = self._get_IMC_module(scm_instance.alias)
 
        imc = IMC(scm_instance)
 

	
 
        if not parent_cs:
kallithea/tests/base.py
Show inline comments
 
@@ -13,25 +13,25 @@
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import tempfile
 
import time
 

	
 
import pytest
 
from webtest import TestApp
 

	
 
from kallithea.lib.utils2 import ascii_str, safe_str
 
from kallithea.lib.utils2 import ascii_str
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
skipif = pytest.mark.skipif
 
parametrize = pytest.mark.parametrize
 

	
 
# Hack: These module global values MUST be set to actual values before running any tests. This is currently done by conftest.py.
 
url = None
 
testapp = None
 

	
 
@@ -171,25 +171,24 @@ class TestController(object):
 
    def assert_authenticated_user(self, response, expected_username):
 
        cookie = response.session.get('authuser')
 
        user = cookie and cookie.get('user_id')
 
        user = user and User.get(user)
 
        user = user and user.username
 
        assert user == expected_username
 

	
 
    def session_csrf_secret_token(self):
 
        return ascii_str(self.app.get(url('session_csrf_secret_token')).body)
 

	
 
    def checkSessionFlash(self, response, msg=None, skip=0, _matcher=lambda msg, m: msg in m):
 
        if 'flash' not in response.session:
 
            pytest.fail(safe_str(u'msg `%s` not found - session has no flash:\n%s' % (msg, response)))
 
            pytest.fail(u'msg `%s` not found - session has no flash:\n%s' % (msg, response))
 
        try:
 
            level, m = response.session['flash'][-1 - skip]
 
            if _matcher(msg, m):
 
                return
 
        except IndexError:
 
            pass
 
        pytest.fail(safe_str(u'msg `%s` not found in session flash (skipping %s): %s' %
 
                           (msg, skip,
 
                            ', '.join('`%s`' % m for level, m in response.session['flash']))))
 
        pytest.fail(u'msg `%s` not found in session flash (skipping %s): %s' %
 
                    (msg, skip, ', '.join('`%s`' % m for level, m in response.session['flash'])))
 

	
 
    def checkSessionFlashRegex(self, response, regex, skip=0):
 
        self.checkSessionFlash(response, regex, skip=skip, _matcher=re.search)
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
import urllib.parse
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import Permission, RepoGroup, Repository, Ui, User, UserRepoToPerm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture, error_function
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
@@ -65,25 +64,25 @@ class _BaseTestCase(base.TestController)
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    def test_case_insensitivity(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = u'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                                 fixture._get_repo_create_params(repo_private=False,
 
@@ -140,25 +139,25 @@ class _BaseTestCase(base.TestController)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 1
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_in_group_without_needed_permissions(self):
 
        usr = self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
 
        # avoid spurious RepoGroup DetachedInstanceError ...
 
@@ -232,25 +231,25 @@ class _BaseTestCase(base.TestController)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 1
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        RepoGroupModel().delete(group_name_allowed)
 
        Session().commit()
 

	
 
    def test_create_in_group_inherit_permissions(self):
 
        self.log_user()
 
@@ -289,25 +288,25 @@ class _BaseTestCase(base.TestController)
 
        new_repo_id = new_repo.repo_id
 

	
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        # check if inherited permissiona are applied
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 2
 

	
 
        assert base.TEST_USER_REGULAR_LOGIN in [x.user.username
 
                                                    for x in inherited_perms]
 
@@ -364,47 +363,47 @@ class _BaseTestCase(base.TestController)
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
            vcs.get_repo(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(base.url('delete_repo', repo_name=repo_name),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
 

	
 
        response.follow()
 

	
 
        # check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        assert deleted_repo is None
 

	
 
        assert os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)) == False
 

	
 
    def test_delete_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (safe_str(self.NEW_REPO), non_ascii)
 
        repo_name = "%s%s" % (self.NEW_REPO, non_ascii)
 
        description = 'description for newly created repo' + non_ascii
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               u'Created repository <a href="/%s">%s</a>'
kallithea/tests/functional/test_forks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import urllib.parse
 

	
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import Repository, User
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class _BaseTestCase(base.TestController):
 
@@ -135,46 +134,46 @@ class _BaseTestCase(base.TestController)
 
        response.mustcontain(self.REPO_TYPE)
 
        response.mustcontain('Fork of "<a href="/%s">%s</a>"' % (repo_name, repo_name))
 

	
 
        fixture.destroy_repo(fork_name_full)
 
        fixture.destroy_repo_group(group_id)
 

	
 
    def test_fork_unicode(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        fork_name = safe_str(self.REPO_FORK + u'-rødgrød')
 
        fork_name = self.REPO_FORK + u'-rødgrød'
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': 'unicode repo 1',
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        self.app.post(base.url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 
        response = self.app.get(base.url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (urllib.parse.quote(fork_name), fork_name)
 
        )
 
        fork_repo = Repository.get_by_repo_name(fork_name)
 
        assert fork_repo
 

	
 
        # fork the fork
 
        fork_name_2 = safe_str(self.REPO_FORK + u'-blåbærgrød')
 
        fork_name_2 = self.REPO_FORK + u'-blåbærgrød'
 
        creation_args = {
 
            'repo_name': fork_name_2,
 
            'repo_group': u'-1',
 
            'fork_parent_id': fork_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': 'unicode repo 2',
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        self.app.post(base.url(controller='forks', action='fork_create',
 
                          repo_name=fork_name), creation_args)
 
        response = self.app.get(base.url(controller='forks', action='forks',
kallithea/tests/vcs/test_hg.py
Show inline comments
 
import os
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib.vcs.backends.hg import MercurialChangeset, MercurialRepository
 
from kallithea.lib.vcs.exceptions import NodeDoesNotExistError, RepositoryError, VCSError
 
from kallithea.lib.vcs.nodes import NodeKind, NodeState
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, TEST_HG_REPO_PULL, TESTS_TMP_PATH
 

	
 

	
 
class TestMercurialRepository(object):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_HG_REPO_CLONE):
 
            pytest.fail('Cannot test mercurial clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_HG_REPO_CLONE)
 

	
 
    def setup_method(self):
 
        self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = os.path.join(TESTS_TMP_PATH, 'errorrepo')
 
        with pytest.raises(RepositoryError):
 
            MercurialRepository(wrong_repo_path)
 

	
 
    def test_unicode_path_repo(self):
 
        with pytest.raises(VCSError):
 
            MercurialRepository(u'iShouldFail')
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            assert raw_id == repo_clone.get_changeset(raw_id).raw_id
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 

	
 
        # check if current workdir was updated
 
        assert os.path.isfile(
 
            os.path.join(
 
                TEST_HG_REPO_CLONE + '_w_update', 'MANIFEST.in'
 
            )
 
        )
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
 
            src_url=TEST_HG_REPO, update_after_clone=False)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 
        assert not os.path.isfile(
 
            os.path.join(
 
                TEST_HG_REPO_CLONE + '_wo_update', 'MANIFEST.in'
 
            )
 
        )
 

	
 
    def test_pull(self):
 
        if os.path.exists(TEST_HG_REPO_PULL):
 
            pytest.fail('Cannot test mercurial pull command as location %s '
 
@@ -247,25 +246,25 @@ TODO: To be written...
 
    def test_get_diff_sanitizes_negative_context(self, mock_diffopts):
 
        negative_context = -10
 
        zero_context = 0
 

	
 
        self.repo.get_diff(0, 1, 'foo', context=negative_context)
 

	
 
        mock_diffopts.assert_called_once_with(git=True, showfunc=True, ignorews=False, context=zero_context)
 

	
 

	
 
class TestMercurialChangeset(object):
 

	
 
    def setup_method(self):
 
        self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def _test_equality(self, changeset):
 
        revision = changeset.revision
 
        assert changeset == self.repo.get_changeset(revision)
 

	
 
    def test_equality(self):
 
        revs = [0, 10, 20]
 
        changesets = [self.repo.get_changeset(rev) for rev in revs]
 
        for changeset in changesets:
 
            self._test_equality(changeset)
 

	
 
    def test_default_changeset(self):
kallithea/tests/vcs/test_vcs.py
Show inline comments
 
import os
 
import shutil
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs import VCSError, get_backend, get_repo
 
from kallithea.lib.vcs.backends.hg import MercurialRepository
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_HG_REPO, TESTS_TMP_PATH
 

	
 

	
 
class TestVCS(object):
 
    """
 
    Tests for main module's methods.
 
    """
 

	
 
    def test_get_backend(self):
 
        hg = get_backend('hg')
 
        assert hg == MercurialRepository
 

	
 
    def test_alias_detect_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 
        repo = backend(path)
 
        assert 'hg' == repo.alias
 

	
 
    def test_alias_detect_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 
        repo = backend(path)
 
        assert 'git' == repo.alias
 

	
 
    def test_wrong_alias(self):
 
        alias = 'wrong_alias'
 
        with pytest.raises(VCSError):
 
            get_backend(alias)
 

	
 
    def test_get_repo(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 
        repo = backend(path)
 

	
 
        assert repo.__class__ == get_repo(safe_str(path), alias).__class__
 
        assert repo.path == get_repo(safe_str(path), alias).path
 
        assert repo.__class__ == get_repo(path, alias).__class__
 
        assert repo.path == get_repo(path, alias).path
 

	
 
    def test_get_repo_autoalias_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 
        repo = backend(path)
 

	
 
        assert repo.__class__ == get_repo(safe_str(path)).__class__
 
        assert repo.path == get_repo(safe_str(path)).path
 
        assert repo.__class__ == get_repo(path).__class__
 
        assert repo.path == get_repo(path).path
 

	
 
    def test_get_repo_autoalias_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 
        repo = backend(path)
 

	
 
        assert repo.__class__ == get_repo(safe_str(path)).__class__
 
        assert repo.path == get_repo(safe_str(path)).path
 
        assert repo.__class__ == get_repo(path).__class__
 
        assert repo.path == get_repo(path).path
 

	
 
    def test_get_repo_err(self):
 
        blank_repo_path = os.path.join(TESTS_TMP_PATH, 'blank-error-repo')
 
        if os.path.isdir(blank_repo_path):
 
            shutil.rmtree(blank_repo_path)
 

	
 
        os.mkdir(blank_repo_path)
 
        with pytest.raises(VCSError):
 
            get_repo(blank_repo_path)
 
        with pytest.raises(VCSError):
 
            get_repo(blank_repo_path + 'non_existing')
 

	
0 comments (0 inline, 0 general)