Changeset - 431689d7f37d
[Not reviewed]
default
0 41 0
Søren Løvborg - 10 years ago 2015-08-31 17:42:57
sorenl@unity3d.com
remove vestiges of Python 2.5 support

We only support Python 2.6 and 2.7; hence we do not need to import
with-statement support from __future__.
41 files changed with 0 insertions and 41 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_api
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Api CLI client for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 3, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import sys
 
import argparse
 

	
 
from kallithea.bin.base import json, api_call, RcConf, FORMAT_JSON, FORMAT_PRETTY
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-api [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] "
 
      "METHOD <key:val> <key2:val> ...\n"
 
      "Create config file: kallithea-api --apikey=<key> --apihost=http://your.kallithea.server --save-config"
 
    )
 

	
 
    parser = argparse.ArgumentParser(description='Kallithea API cli',
 
                                     usage=usage)
 

	
 
    ## config
 
    group = parser.add_argument_group('config')
 
    group.add_argument('--apikey', help='api access key')
 
    group.add_argument('--apihost', help='api host')
 
    group.add_argument('--config', help='config file')
 
    group.add_argument('--save-config', action='store_true', help='save the given config into a file')
 

	
 
    group = parser.add_argument_group('API')
 
    group.add_argument('method', metavar='METHOD', nargs='?', type=str, default=None,
 
            help='API method name to call followed by key:value attributes',
 
    )
 
    group.add_argument('--format', dest='format', type=str,
 
            help='output format default: `%s` can '
 
                 'be also `%s`' % (FORMAT_PRETTY, FORMAT_JSON),
 
            default=FORMAT_PRETTY
 
    )
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def main(argv=None):
 
    """
 
    Main execution function for cli
 

	
 
    :param argv:
 
    """
 
    if argv is None:
 
        argv = sys.argv
 

	
 
    conf = None
 
    parser, args, other = argparser(argv)
 

	
 
    api_credentials_given = (args.apikey and args.apihost)
 
    if args.save_config:
 
        if not api_credentials_given:
 
            raise parser.error('--save-config requires --apikey and --apihost')
 
        conf = RcConf(config_location=args.config,
 
                      autocreate=True, config={'apikey': args.apikey,
 
                                               'apihost': args.apihost})
 
        sys.exit()
 

	
 
    if not conf:
 
        conf = RcConf(config_location=args.config, autoload=True)
 
        if not conf:
 
            if not api_credentials_given:
 
                parser.error('Could not find config file and missing '
 
                             '--apikey or --apihost in params')
 

	
 
    apikey = args.apikey or conf['apikey']
 
    apihost = args.apihost or conf['apihost']
 
    method = args.method
 

	
 
    # if we don't have method here it's an error
 
    if not method:
 
        parser.error('Please specify method name')
 

	
 
    try:
 
        margs = dict(map(lambda s: s.split(':', 1), other))
 
    except ValueError:
 
        sys.stderr.write('Error parsing arguments \n')
 
        sys.exit()
 
    if args.format == FORMAT_PRETTY:
 
        print 'Calling method %s => %s' % (method, apihost)
 

	
 
    json_resp = api_call(apikey, apihost, method, **margs)
 
    error_prefix = ''
 
    if json_resp['error']:
 
        error_prefix = 'ERROR:'
 
        json_data = json_resp['error']
 
    else:
 
        json_data = json_resp['result']
 
    if args.format == FORMAT_JSON:
 
        print json.dumps(json_data)
 
    elif args.format == FORMAT_PRETTY:
 
        print 'Server response \n%s%s' % (
 
            error_prefix, json.dumps(json_data, indent=4, sort_keys=True)
 
        )
 
    return 0
 

	
kallithea/bin/kallithea_config.py
Show inline comments
 
#!/usr/bin/env python2
 

	
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_config
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
configuration generator for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 18, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
from __future__ import with_statement
 
import os
 
import sys
 
import uuid
 
import argparse
 
from mako.template import Template
 
TMPL = 'template.ini.mako'
 
here = os.path.dirname(os.path.abspath(__file__))
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-config [-h] [--filename=FILENAME] [--template=TEMPLATE] \n"
 
      "VARS optional specify extra template variable that will be available in "
 
      "template. Use comma separated key=val format eg.\n"
 
      "key1=val1,port=5000,host=127.0.0.1,elements='a\,b\,c'\n"
 
    )
 

	
 
    parser = argparse.ArgumentParser(
 
        description='Kallithea CONFIG generator with variable replacement',
 
        usage=usage
 
    )
 

	
 
    ## config
 
    group = parser.add_argument_group('CONFIG')
 
    group.add_argument('--filename', help='Output ini filename.')
 
    group.add_argument('--template', help='Mako template file to use instead of '
 
                                          'the default builtin template')
 
    group.add_argument('--raw', help='Store given mako template as raw without '
 
                                     'parsing. Use this to create custom template '
 
                                     'initially', action='store_true')
 
    group.add_argument('--show-defaults', help='Show all default variables for '
 
                                               'builtin template', action='store_true')
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def _escape_split(text, sep):
 
    """
 
    Allows for escaping of the separator: e.g. arg='foo\, bar'
 

	
 
    It should be noted that the way bash et. al. do command line parsing, those
 
    single quotes are required. a shameless ripoff from fabric project.
 

	
 
    """
 
    escaped_sep = r'\%s' % sep
 

	
 
    if escaped_sep not in text:
 
        return text.split(sep)
 

	
 
    before, _, after = text.partition(escaped_sep)
 
    startlist = before.split(sep)  # a regular split is fine here
 
    unfinished = startlist[-1]
 
    startlist = startlist[:-1]
 

	
 
    # recurse because there may be more escaped separators
 
    endlist = _escape_split(after, sep)
 

	
 
    # finish building the escaped value. we use endlist[0] becaue the first
 
    # part of the string sent in recursion is the rest of the escaped value.
 
    unfinished += sep + endlist[0]
 

	
 
    return startlist + [unfinished] + endlist[1:]  # put together all the parts
 

	
 
def _run(argv):
 
    parser, args, other = argparser(argv)
 
    if not len(sys.argv) > 1:
 
        print parser.print_help()
 
        sys.exit(0)
 
    # defaults that can be overwritten by arguments
 
    tmpl_stored_args = {
 
        'http_server': 'waitress',
 
        'lang': 'en',
 
        'database_engine': 'sqlite',
 
        'host': '127.0.0.1',
 
        'port': 5000,
 
        'error_aggregation_service': None,
 
    }
 
    if other:
 
        # parse arguments, we assume only first is correct
 
        kwargs = {}
 
        for el in _escape_split(other[0], ','):
 
            kv = _escape_split(el, '=')
 
            if len(kv) == 2:
 
                k, v = kv
 
                kwargs[k] = v
 
        # update our template stored args
 
        tmpl_stored_args.update(kwargs)
 

	
 
    # use default that cannot be replaced
 
    tmpl_stored_args.update({
 
        'uuid': lambda: uuid.uuid4().hex,
 
        'here': os.path.abspath(os.curdir),
 
    })
 
    if args.show_defaults:
 
        for k,v in tmpl_stored_args.iteritems():
 
            print '%s=%s' % (k, v)
 
        sys.exit(0)
kallithea/bin/kallithea_gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_gist
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Gist CLI client for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import sys
 
import stat
 
import argparse
 
import fileinput
 

	
 
from kallithea.bin.base import json, api_call, RcConf, FORMAT_JSON, FORMAT_PRETTY
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-gist [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] [GIST OPTIONS] "
 
      "[filename or stdin use - for terminal stdin ]\n"
 
      "Create config file: kallithea-gist --apikey=<key> --apihost=http://your.kallithea.server --save-config"
 
    )
 

	
 
    parser = argparse.ArgumentParser(description='Kallithea Gist cli',
 
                                     usage=usage)
 

	
 
    ## config
 
    group = parser.add_argument_group('config')
 
    group.add_argument('--apikey', help='api access key')
 
    group.add_argument('--apihost', help='api host')
 
    group.add_argument('--config', help='config file path DEFAULT: ~/.config/kallithea')
 
    group.add_argument('--save-config', action='store_true',
 
                       help='save the given config into a file')
 

	
 
    group = parser.add_argument_group('GIST')
 
    group.add_argument('-p', '--private', action='store_true',
 
                       help='create private Gist')
 
    group.add_argument('-f', '--filename',
 
                       help='set uploaded gist filename, '
 
                            'also defines syntax highlighting')
 
    group.add_argument('-d', '--description', help='Gist description')
 
    group.add_argument('-l', '--lifetime', metavar='MINUTES',
 
                       help='gist lifetime in minutes, -1 (DEFAULT) is forever')
 
    group.add_argument('--format', dest='format', type=str,
 
                       help='output format DEFAULT: `%s` can '
 
                       'be also `%s`' % (FORMAT_PRETTY, FORMAT_JSON),
 
            default=FORMAT_PRETTY
 
    )
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def _run(argv):
 
    conf = None
 
    parser, args, other = argparser(argv)
 

	
 
    api_credentials_given = (args.apikey and args.apihost)
 
    if args.save_config:
 
        if not api_credentials_given:
 
            raise parser.error('--save-config requires --apikey and --apihost')
 
        conf = RcConf(config_location=args.config,
 
                      autocreate=True, config={'apikey': args.apikey,
 
                                               'apihost': args.apihost})
 
        sys.exit()
 

	
 
    if not conf:
 
        conf = RcConf(config_location=args.config, autoload=True)
 
        if not conf:
 
            if not api_credentials_given:
 
                parser.error('Could not find config file and missing '
 
                             '--apikey or --apihost in params')
 

	
 
    apikey = args.apikey or conf['apikey']
 
    host = args.apihost or conf['apihost']
 
    DEFAULT_FILENAME = 'gistfile1.txt'
 
    if other:
 
        # skip multifiles for now
 
        filename = other[0]
 
        if filename == '-':
 
            filename = DEFAULT_FILENAME
 
            gist_content = ''
 
            for line in fileinput.input('-'):
 
                gist_content += line
 
        else:
 
            with open(filename, 'rb') as f:
 
                gist_content = f.read()
 

	
 
    else:
 
        filename = DEFAULT_FILENAME
 
        gist_content = None
 
        # little bit hacky but cross platform check where the
 
        # stdin comes from we skip the terminal case it can be handled by '-'
 
        mode = os.fstat(0).st_mode
 
        if stat.S_ISFIFO(mode):
 
            # "stdin is piped"
 
            gist_content = sys.stdin.read()
 
        elif stat.S_ISREG(mode):
 
            # "stdin is redirected"
 
            gist_content = sys.stdin.read()
 
        else:
 
            # "stdin is terminal"
 
            pass
kallithea/config/routing.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Routes configuration
 

	
 
The more specific and detailed routes should be defined first so they
 
may take precedent over the more generic routes. For more information
 
refer to the routes manual at http://routes.groovie.org/docs/
 
"""
 

	
 
from __future__ import with_statement
 
from routes import Mapper
 

	
 
# prefix for non repository related links needs to be prefixed with `/`
 
ADMIN_PREFIX = '/_admin'
 

	
 

	
 
def make_map(config):
 
    """Create, configure and return the routes Mapper"""
 
    rmap = Mapper(directory=config['pylons.paths']['controllers'],
 
                  always_scan=config['debug'])
 
    rmap.minimization = False
 
    rmap.explicit = False
 

	
 
    from kallithea.lib.utils import (is_valid_repo, is_valid_repo_group,
 
                                     get_repo_by_id)
 

	
 
    def check_repo(environ, match_dict):
 
        """
 
        check for valid repository for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_name = match_dict.get('repo_name')
 

	
 
        if match_dict.get('f_path'):
 
            #fix for multiple initial slashes that causes errors
 
            match_dict['f_path'] = match_dict['f_path'].lstrip('/')
 

	
 
        by_id_match = get_repo_by_id(repo_name)
 
        if by_id_match:
 
            repo_name = by_id_match
 
            match_dict['repo_name'] = repo_name
 

	
 
        return is_valid_repo(repo_name, config['base_path'])
 

	
 
    def check_group(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_group_name = match_dict.get('group_name')
 
        return is_valid_repo_group(repo_group_name, config['base_path'])
 

	
 
    def check_group_skip_path(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling, but skips
 
        verification of existing path
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_group_name = match_dict.get('group_name')
 
        return is_valid_repo_group(repo_group_name, config['base_path'],
 
                                   skip_path_check=True)
 

	
 
    def check_user_group(environ, match_dict):
 
        """
 
        check for valid user group for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        return True
 

	
 
    def check_int(environ, match_dict):
 
        return match_dict.get('id').isdigit()
 

	
 
    # The ErrorController route (handles 404/500 error pages); it should
 
    # likely stay at the top, ensuring it can always be resolved
 
    rmap.connect('/error/{action}', controller='error')
 
    rmap.connect('/error/{action}/{id}', controller='error')
 

	
 
    #==========================================================================
 
    # CUSTOM ROUTES HERE
 
    #==========================================================================
 

	
 
    #MAIN PAGE
 
    rmap.connect('home', '/', controller='home', action='index')
 
    rmap.connect('about', '/about', controller='home', action='about')
 
    rmap.connect('repo_switcher_data', '/_repos', controller='home',
 
                 action='repo_switcher_data')
 

	
 
    rmap.connect('rst_help',
 
                 "http://docutils.sourceforge.net/docs/user/rst/quickref.html",
 
                 _static=True)
 
    rmap.connect('kallithea_project_url', "https://kallithea-scm.org/", _static=True)
 
    rmap.connect('issues_url', 'https://bitbucket.org/conservancy/kallithea/issues', _static=True)
 

	
 
    #ADMIN REPOSITORY ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/repos') as m:
 
        m.connect("repos", "/repos",
 
                  action="create", conditions=dict(method=["POST"]))
kallithea/controllers/files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.files
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Files controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import logging
 
import traceback
 
import tempfile
 
import shutil
 

	
 
from pylons import request, response, tmpl_context as c, url
 
from pylons.i18n.translation import _
 
from pylons.controllers.util import redirect
 
from kallithea.lib.utils import jsonify, action_logger
 

	
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 

	
 
from kallithea.lib.compat import OrderedDict
 
from kallithea.lib.utils2 import convert_line_endings, detect_mode, safe_str,\
 
    str2bool
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import RepositoryError, \
 
    ChangesetDoesNotExistError, EmptyRepositoryError, \
 
    ImproperArchiveTypeError, VCSError, NodeAlreadyExistsError,\
 
    NodeDoesNotExistError, ChangesetError, NodeError
 
from kallithea.lib.vcs.nodes import FileNode
 

	
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.db import Repository
 

	
 
from kallithea.controllers.changeset import anchor_url, _ignorews_url,\
 
    _context_url, get_line_ctx, get_ignore_ws
 
from webob.exc import HTTPNotFound
 
from kallithea.lib.exceptions import NonRelativePathError
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FilesController(BaseRepoController):
 

	
 
    def __before__(self):
 
        super(FilesController, self).__before__()
 
        c.cut_off_limit = self.cut_off_limit
 

	
 
    def __get_cs(self, rev, silent_empty=False):
 
        """
 
        Safe way to get changeset if error occur it redirects to tip with
 
        proper message
 

	
 
        :param rev: revision to fetch
 
        :silent_empty: return None if repository is empty
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            if silent_empty:
 
                return None
 
            url_ = url('files_add_home',
 
                       repo_name=c.repo_name,
 
                       revision=0, f_path='', anchor='edit')
 
            add_new = h.link_to(_('Click here to add new file'), url_, class_="alert-link")
 
            h.flash(h.literal(_('There are no files yet. %s') % add_new),
 
                    category='warning')
 
            raise HTTPNotFound()
 
        except(ChangesetDoesNotExistError, LookupError), e:
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(safe_str(e), category='error')
 
            raise HTTPNotFound()
 

	
 
    def __get_filenode(self, cs, path):
 
        """
 
        Returns file_node or raise HTTP error.
 

	
 
        :param cs: given changeset
 
        :param path: path to lookup
 
        """
 

	
 
        try:
 
            file_node = cs.get_node(path)
 
            if file_node.is_dir():
 
                raise RepositoryError('given path is a directory')
 
        except(ChangesetDoesNotExistError,), e:
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(safe_str(e), category='error')
 
            raise HTTPNotFound()
 

	
 
        return file_node
kallithea/lib/auth.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth
 
~~~~~~~~~~~~~~~~~~
 

	
 
authentication and permission libraries
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
from __future__ import with_statement
 
import time
 
import os
 
import logging
 
import traceback
 
import hashlib
 
import itertools
 
import collections
 

	
 
from decorator import decorator
 

	
 
from pylons import url, request
 
from pylons.controllers.util import abort, redirect
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib import secure_form
 
from sqlalchemy import or_
 
from sqlalchemy.orm.exc import ObjectDeletedError
 
from sqlalchemy.orm import joinedload
 

	
 
from kallithea import __platform__, is_windows, is_unix
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model import meta
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.model.db import User, Repository, Permission, \
 
    UserToPerm, UserGroupRepoToPerm, UserGroupToPerm, UserGroupMember, \
 
    RepoGroup, UserGroupRepoGroupToPerm, UserIpMap, UserGroupUserGroupToPerm, \
 
    UserGroup, UserApiKeys
 

	
 
from kallithea.lib.utils2 import safe_unicode, aslist
 
from kallithea.lib.utils import get_repo_slug, get_repo_group_slug, \
 
    get_user_group_slug, conditional_cache
 
from kallithea.lib.caching_query import FromCache
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """
 
    This is a simple class for generating password from different sets of
 
    characters
 
    usage::
 

	
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
        + ALPHABETS_NUM + ALPHABETS_SPECIAL
 
    ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
 
    ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
 
    ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
 
    ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
 

	
 
    def gen_password(self, length, alphabet=ALPHABETS_FULL):
 
        assert len(alphabet) <= 256, alphabet
 
        l = []
 
        while len(l) < length:
 
            i = ord(os.urandom(1))
 
            if i < len(alphabet):
 
                l.append(alphabet[i])
 
        return ''.join(l)
 

	
 

	
 
class KallitheaCrypto(object):
 

	
 
    @classmethod
 
    def hash_string(cls, str_):
 
        """
 
        Cryptographic function used for password hashing based on pybcrypt
 
        or Python's own OpenSSL wrapper on windows
 

	
 
        :param password: password to hash
 
        """
 
        if is_windows:
 
            return hashlib.sha256(str_).hexdigest()
 
        elif is_unix:
 
            import bcrypt
 
            return bcrypt.hashpw(str_, bcrypt.gensalt(10))
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 

	
 
    @classmethod
 
    def hash_check(cls, password, hashed):
 
        """
 
        Checks matching password with it's hashed value, runs different
 
        implementation based on platform it runs on
 

	
 
        :param password: password
 
        :param hashed: password in hashed form
kallithea/lib/indexers/daemon.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.indexers.daemon
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
A daemon will read from task table and run tasks
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 26, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 
import traceback
 

	
 
from shutil import rmtree
 
from time import mktime
 

	
 
from os.path import dirname as dn
 
from os.path import join as jn
 

	
 
# Add location of top level folder to sys.path
 
project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
 
sys.path.append(project_path)
 

	
 
from kallithea.config.conf import INDEX_EXTENSIONS
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.db import Repository
 
from kallithea.lib.utils2 import safe_unicode, safe_str
 
from kallithea.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
 
    CHGSET_IDX_NAME
 

	
 
from kallithea.lib.vcs.exceptions import ChangesetError, RepositoryError, \
 
    NodeDoesNotExistError
 

	
 
from whoosh.index import create_in, open_dir, exists_in
 
from whoosh.query import *
 
from whoosh.qparser import QueryParser
 

	
 
log = logging.getLogger('whoosh_indexer')
 

	
 

	
 
class WhooshIndexingDaemon(object):
 
    """
 
    Daemon for atomic indexing jobs
 
    """
 

	
 
    def __init__(self, indexname=IDX_NAME, index_location=None,
 
                 repo_location=None, sa=None, repo_list=None,
 
                 repo_update_list=None):
 
        self.indexname = indexname
 

	
 
        self.index_location = index_location
 
        if not index_location:
 
            raise Exception('You have to provide index location')
 

	
 
        self.repo_location = repo_location
 
        if not repo_location:
 
            raise Exception('You have to provide repositories location')
 

	
 
        self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
 

	
 
        #filter repo list
 
        if repo_list:
 
            #Fix non-ascii repo names to unicode
 
            repo_list = map(safe_unicode, repo_list)
 
            self.filtered_repo_paths = {}
 
            for repo_name, repo in self.repo_paths.items():
 
                if repo_name in repo_list:
 
                    self.filtered_repo_paths[repo_name] = repo
 

	
 
            self.repo_paths = self.filtered_repo_paths
 

	
 
        #filter update repo list
 
        self.filtered_repo_update_paths = {}
 
        if repo_update_list:
 
            self.filtered_repo_update_paths = {}
 
            for repo_name, repo in self.repo_paths.items():
 
                if repo_name in repo_update_list:
 
                    self.filtered_repo_update_paths[repo_name] = repo
 
            self.repo_paths = self.filtered_repo_update_paths
 

	
 
        self.initial = True
 
        if not os.path.isdir(self.index_location):
 
            os.makedirs(self.index_location)
 
            log.info('Cannot run incremental index since it does not '
 
                     'yet exist running full build')
 
        elif not exists_in(self.index_location, IDX_NAME):
 
            log.info('Running full index build as the file content '
 
                     'index does not exist')
 
        elif not exists_in(self.index_location, CHGSET_IDX_NAME):
 
            log.info('Running full index build as the changeset '
 
                     'index does not exist')
 
        else:
 
            self.initial = False
 

	
 
    def _get_index_revision(self, repo):
 
        db_repo = Repository.get_by_repo_name(repo.name_unicode)
 
        landing_rev = 'tip'
 
        if db_repo:
 
            _rev_type, _rev = db_repo.landing_rev
 
            landing_rev = _rev
 
        return landing_rev
 

	
 
    def _get_index_changeset(self, repo, index_rev=None):
kallithea/lib/paster_commands/cache_keys.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.cache_keys
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
cleanup-keys paster command for Kallithea
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: mar 27, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from kallithea.model.meta import Session
 
from kallithea.lib.utils import BasePasterCommand
 
from kallithea.model.db import CacheInvalidation
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Cache keys utils"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 
        _caches = CacheInvalidation.query().order_by(CacheInvalidation.cache_key).all()
 
        if self.options.show:
 
            for c_obj in _caches:
 
                print 'key:%s active:%s' % (c_obj.cache_key, c_obj.cache_active)
 
        elif self.options.cleanup:
 
            for c_obj in _caches:
 
                Session().delete(c_obj)
 
                print 'removing key:%s' % (c_obj.cache_key)
 
                Session().commit()
 
        else:
 
            print 'nothing done exiting...'
 
        sys.exit(0)
 

	
 
    def update_parser(self):
 
        self.parser.add_option(
 
            '--show',
 
            action='store_true',
 
            dest='show',
 
            help=("show existing cache keys with together with status")
 
        )
 

	
 
        self.parser.add_option(
 
            '--cleanup',
 
            action="store_true",
 
            dest="cleanup",
 
            help="cleanup existing cache keys"
 
        )
kallithea/lib/paster_commands/cleanup.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.cleanup
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
cleanup-repos paster command for Kallithea
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import re
 
import shutil
 
import logging
 
import datetime
 

	
 
from kallithea.lib.utils import BasePasterCommand, ask_ok, REMOVED_REPO_PAT
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import Ui
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Cleanup deleted repos"
 

	
 
    def _parse_older_than(self, val):
 
        regex = re.compile(r'((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)m)?((?P<seconds>\d+?)s)?')
 
        parts = regex.match(val)
 
        if not parts:
 
            return
 
        parts = parts.groupdict()
 
        time_params = {}
 
        for (name, param) in parts.iteritems():
 
            if param:
 
                time_params[name] = int(param)
 
        return datetime.timedelta(**time_params)
 

	
 
    def _extract_date(self, name):
 
        """
 
        Extract the date part from rm__<date> pattern of removed repos,
 
        and convert it to datetime object
 

	
 
        :param name:
 
        """
 
        date_part = name[4:19]  # 4:19 since we don't parse milisecods
 
        return datetime.datetime.strptime(date_part, '%Y%m%d_%H%M%S')
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 

	
 
        repos_location = Ui.get_repos_location()
 
        to_remove = []
 
        for dn_, dirs, f in os.walk(safe_str(repos_location)):
 
            alldirs = list(dirs)
 
            del dirs[:]
 
            if ('.hg' in alldirs or
 
                'objects' in alldirs and ('refs' in alldirs or 'packed-refs' in f)):
 
                continue
 
            for loc in alldirs:
 
                if REMOVED_REPO_PAT.match(loc):
 
                    to_remove.append([os.path.join(dn_, loc),
 
                                      self._extract_date(loc)])
 
                else:
 
                    dirs.append(loc)
 

	
 
        #filter older than (if present)!
 
        now = datetime.datetime.now()
 
        older_than = self.options.older_than
 
        if older_than:
 
            to_remove_filtered = []
 
            older_than_date = self._parse_older_than(older_than)
 
            for name, date_ in to_remove:
 
                repo_age = now - date_
 
                if repo_age > older_than_date:
 
                    to_remove_filtered.append([name, date_])
 

	
 
            to_remove = to_remove_filtered
 
            print >> sys.stdout, 'removing %s deleted repos older than %s (%s)' \
 
                % (len(to_remove), older_than, older_than_date)
 
        else:
 
            print >> sys.stdout, 'removing all [%s] deleted repos' \
 
                % len(to_remove)
 
        if self.options.dont_ask or not to_remove:
 
            # don't ask just remove !
 
            remove = True
 
        else:
 
            remove = ask_ok('the following repositories will be deleted completely:\n%s\n'
 
                            'are you sure you want to remove them [y/n]?'
 
                            % ', \n'.join(['%s removed on %s'
kallithea/lib/paster_commands/ishell.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.ishell
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
interactive shell paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from kallithea.lib.utils import BasePasterCommand
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Interactive shell"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 

	
 
        # imports, used in ipython shell
 
        import os
 
        import sys
 
        import time
 
        import shutil
 
        import datetime
 
        from kallithea.model.db import *
 

	
 
        try:
 
            from IPython import embed
 
            from IPython.config.loader import Config
 
            cfg = Config()
 
            cfg.InteractiveShellEmbed.confirm_exit = False
 
            embed(config=cfg, banner1="Kallithea IShell.")
 
        except ImportError:
 
            print 'ipython installation required for ishell'
 
            sys.exit(-1)
 

	
 
    def update_parser(self):
 
        pass
kallithea/lib/paster_commands/make_index.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.make_index
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
make-index paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 17, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from string import strip
 
from kallithea.model.repo import RepoModel
 
from kallithea.lib.utils import BasePasterCommand, load_rcextensions
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Creates or updates full text search index"
 

	
 
    def command(self):
 
        logging.config.fileConfig(self.path_to_ini_file)
 
        #get SqlAlchemy session
 
        self._init_session()
 
        from pylons import config
 
        index_location = config['index_dir']
 
        load_rcextensions(config['here'])
 

	
 
        repo_location = self.options.repo_location \
 
            if self.options.repo_location else RepoModel().repos_path
 
        repo_list = map(strip, self.options.repo_list.split(',')) \
 
            if self.options.repo_list else None
 

	
 
        repo_update_list = map(strip, self.options.repo_update_list.split(',')) \
 
            if self.options.repo_update_list else None
 

	
 
        #======================================================================
 
        # WHOOSH DAEMON
 
        #======================================================================
 
        from kallithea.lib.pidlock import LockHeld, DaemonLock
 
        from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
        try:
 
            l = DaemonLock(file_=os.path.join(dn(dn(index_location)),
 
                                              'make_index.lock'))
 
            WhooshIndexingDaemon(index_location=index_location,
 
                                 repo_location=repo_location,
 
                                 repo_list=repo_list,
 
                                 repo_update_list=repo_update_list)\
 
                .run(full_index=self.options.full_index)
 
            l.release()
 
        except LockHeld:
 
            sys.exit(1)
 

	
 
    def update_parser(self):
 
        self.parser.add_option('--repo-location',
 
                          action='store',
 
                          dest='repo_location',
 
                          help="Specifies repositories location to index OPTIONAL",
 
                          )
 
        self.parser.add_option('--index-only',
 
                          action='store',
 
                          dest='repo_list',
 
                          help="Specifies a comma separated list of repositories "
 
                                "to build index on. If not given all repositories "
 
                                "are scanned for indexing. OPTIONAL",
 
                          )
 
        self.parser.add_option('--update-only',
 
                          action='store',
 
                          dest='repo_update_list',
 
                          help="Specifies a comma separated list of repositories "
 
                                "to re-build index on. OPTIONAL",
 
                          )
 
        self.parser.add_option('-f',
 
                          action='store_true',
 
                          dest='full_index',
 
                          help="Specifies that index should be made full i.e"
 
                                " destroy old and build from scratch",
 
                          default=False)
kallithea/lib/paster_commands/make_rcextensions.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.make_rcextensions
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
make-rcext paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import pkg_resources
 

	
 
from kallithea.lib.utils import BasePasterCommand, ask_ok
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Write template file for extending Kallithea in Python."
 
    usage = "CONFIG_FILE"
 
    description = '''\
 
        A rcextensions directory with a __init__.py file will be created next to
 
        the ini file. Local customizations in that file will survive upgrades.
 
        The file contains instructions on how it can be customized.
 
        '''
 

	
 
    def command(self):
 
        from pylons import config
 

	
 
        here = config['here']
 
        content = pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'rcextensions', '__init__.py')
 
        )
 
        ext_file = os.path.join(here, 'rcextensions', '__init__.py')
 
        if os.path.exists(ext_file):
 
            msg = ('Extension file already exists, do you want '
 
                   'to overwrite it ? [y/n]')
 
            if not ask_ok(msg):
 
                print 'Nothing done...'
 
                return
 

	
 
        dirname = os.path.dirname(ext_file)
 
        if not os.path.isdir(dirname):
 
            os.makedirs(dirname)
 
        with open(ext_file, 'wb') as f:
 
            f.write(content)
 
            print 'Wrote new extensions file to %s' % ext_file
 

	
 
    def update_parser(self):
 
        pass
kallithea/lib/paster_commands/repo_scan.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.repo_scan
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
repo-scan paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Feb 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from kallithea.model.scm import ScmModel
 
from kallithea.lib.utils import BasePasterCommand, repo2db_mapper
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Rescan default location for new repositories"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 
        rm_obsolete = self.options.delete_obsolete
 
        log.info('Now scanning root location for new repos...')
 
        added, removed = repo2db_mapper(ScmModel().repo_scan(),
 
                                        remove_obsolete=rm_obsolete)
 
        added = ', '.join(added) or '-'
 
        removed = ', '.join(removed) or '-'
 
        log.info('Scan completed added: %s removed: %s', added, removed)
 

	
 
    def update_parser(self):
 
        self.parser.add_option(
 
            '--delete-obsolete',
 
            action='store_true',
 
            help="Use this flag do delete repositories that are "
 
                 "present in Kallithea database but not on the filesystem",
 
        )
kallithea/lib/paster_commands/update_repoinfo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.update_repoinfo
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
update-repoinfo paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 
import string
 

	
 
from kallithea.lib.utils import BasePasterCommand
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Updates repositories caches for last changeset"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 

	
 
        repo_update_list = map(string.strip,
 
                               self.options.repo_update_list.split(',')) \
 
                               if self.options.repo_update_list else None
 

	
 
        if repo_update_list:
 
            repo_list = list(Repository.query()\
 
                .filter(Repository.repo_name.in_(repo_update_list)))
 
        else:
 
            repo_list = Repository.getAll()
 
        RepoModel.update_repoinfo(repositories=repo_list)
 
        Session().commit()
 

	
 
        if self.options.invalidate_cache:
 
            for r in repo_list:
 
                r.set_invalidate()
 
        print 'Updated cache for %s repositories' % (len(repo_list))
 

	
 
    def update_parser(self):
 
        self.parser.add_option('--update-only',
 
                           action='store',
 
                           dest='repo_update_list',
 
                           help="Specifies a comma separated list of repositories "
 
                                "to update last commit info for. OPTIONAL")
 
        self.parser.add_option('--invalidate-cache',
 
                           action='store_true',
 
                           dest='invalidate_cache',
 
                           help="Trigger cache invalidation event for repos. "
 
                                "OPTIONAL")
kallithea/lib/pidlock.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from __future__ import with_statement
 
import os
 
import errno
 

	
 
from multiprocessing.util import Finalize
 

	
 
from kallithea.lib.compat import kill
 

	
 

	
 
class LockHeld(Exception):
 
    pass
 

	
 

	
 
class DaemonLock(object):
 
    """daemon locking
 
    USAGE:
 
    try:
 
        l = DaemonLock(file_='/path/tolockfile',desc='test lock')
 
        main()
 
        l.release()
 
    except LockHeld:
 
        sys.exit(1)
 
    """
 

	
 
    def __init__(self, file_=None, callbackfn=None,
 
                 desc='daemon lock', debug=False):
 

	
 
        lock_name = os.path.join(os.path.dirname(__file__), 'running.lock')
 
        self.pidfile = file_ if file_ else lock_name
 
        self.callbackfn = callbackfn
 
        self.desc = desc
 
        self.debug = debug
 
        self.held = False
 
        #run the lock automatically !
 
        self.lock()
 
        self._finalize = Finalize(self, DaemonLock._on_finalize,
 
                                  args=(self, debug), exitpriority=10)
 

	
 
    @staticmethod
 
    def _on_finalize(lock, debug):
 
        if lock.held:
 
            if debug:
 
                print 'lock held finalizing and running lock.release()'
 
            lock.release()
 

	
 
    def lock(self):
 
        """
 
        locking function, if lock is present it
 
        will raise LockHeld exception
 
        """
 
        lockname = str(os.getpid())
 
        if self.debug:
 
            print 'running lock'
 
        self.trylock()
 
        self.makelock(lockname, self.pidfile)
 
        return True
 

	
 
    def trylock(self):
 
        running_pid = False
 
        if self.debug:
 
            print 'checking for already running process'
 
        try:
 
            with open(self.pidfile, 'r') as f:
 
                try:
 
                    running_pid = int(f.readline())
 
                except ValueError:
 
                    running_pid = -1
 

	
 
            if self.debug:
 
                print ('lock file present running_pid: %s, '
 
                       'checking for execution' % (running_pid,))
 
            # Now we check the PID from lock file matches to the current
 
            # process PID
 
            if running_pid:
 
                try:
 
                    kill(running_pid, 0)
 
                except OSError as exc:
 
                    if exc.errno in (errno.ESRCH, errno.EPERM):
 
                        print ("Lock File is there but"
 
                               " the program is not running")
 
                        print "Removing lock file for the: %s" % running_pid
 
                        self.release()
 
                    else:
 
                        raise
 
                else:
 
                    print "You already have an instance of the program running"
 
                    print "It is running as process %s" % running_pid
 
                    raise LockHeld()
 

	
 
        except IOError as e:
 
            if e.errno != 2:
 
                raise
 

	
 
    def release(self):
 
        """releases the pid by removing the pidfile
 
        """
 
        if self.debug:
kallithea/lib/profiler.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import objgraph
 
import cProfile
 
import pstats
 
import cgi
 
import pprint
 
import threading
 

	
 
from StringIO import StringIO
 

	
 

	
 
class ProfilingMiddleware(object):
 
    def __init__(self, app):
 
        self.lock = threading.Lock()
 
        self.app = app
 

	
 
    def __call__(self, environ, start_response):
 
        with self.lock:
 
            profiler = cProfile.Profile()
 

	
 
            def run_app(*a, **kw):
 
                self.response = self.app(environ, start_response)
 

	
 
            profiler.runcall(run_app, environ, start_response)
 

	
 
            profiler.snapshot_stats()
 

	
 
            stats = pstats.Stats(profiler)
 
            stats.sort_stats('calls') #cumulative
 

	
 
            # Redirect output
 
            out = StringIO()
 
            stats.stream = out
 

	
 
            stats.print_stats()
 

	
 
            resp = ''.join(self.response)
 

	
 
            # Lets at least only put this on html-like responses.
 
            if resp.strip().startswith('<'):
 
                ## The profiling info is just appended to the response.
 
                ##  Browsers don't mind this.
 
                resp += ('<pre style="text-align:left; '
 
                         'border-top: 4px dashed red; padding: 1em;">')
 
                resp += cgi.escape(out.getvalue(), True)
 

	
 
                ct = objgraph.show_most_common_types()
 
                print ct
 

	
 
                resp += ct if ct else '---'
 

	
 
                output = StringIO()
 
                pprint.pprint(environ, output, depth=3)
 

	
 
                resp += cgi.escape(output.getvalue(), True)
 
                resp += '</pre>'
 

	
 
            return resp
kallithea/model/api_key.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.api_key
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
API key model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Sep 8, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import time
 
import logging
 
from sqlalchemy import or_
 

	
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.model import BaseModel
 
from kallithea.model.db import UserApiKeys
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ApiKeyModel(BaseModel):
 
    cls = UserApiKeys
 

	
 
    def create(self, user, description, lifetime=-1):
 
        """
 
        :param user: user or user_id
 
        :param description: description of ApiKey
 
        :param lifetime: expiration time in seconds
 
        """
 
        user = self._get_user(user)
 

	
 
        new_api_key = UserApiKeys()
 
        new_api_key.api_key = generate_api_key()
 
        new_api_key.user_id = user.user_id
 
        new_api_key.description = description
 
        new_api_key.expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        Session().add(new_api_key)
 

	
 
        return new_api_key
 

	
 
    def delete(self, api_key, user=None):
 
        """
 
        Deletes given api_key, if user is set it also filters the object for
 
        deletion by given user.
 
        """
 
        api_key = UserApiKeys.query().filter(UserApiKeys.api_key == api_key)
 

	
 
        if user is not None:
 
            user = self._get_user(user)
 
            api_key = api_key.filter(UserApiKeys.user_id == user.user_id)
 

	
 
        api_key = api_key.scalar()
 
        Session().delete(api_key)
 

	
 
    def get_api_keys(self, user, show_expired=True):
 
        user = self._get_user(user)
 
        user_api_keys = UserApiKeys.query()\
 
            .filter(UserApiKeys.user_id == user.user_id)
 
        if not show_expired:
 
            user_api_keys = user_api_keys\
 
                .filter(or_(UserApiKeys.expires == -1,
 
                            UserApiKeys.expires >= time.time()))
 
        return user_api_keys
kallithea/model/gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.gist
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
gist model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import time
 
import logging
 
import traceback
 
import shutil
 

	
 
from kallithea.lib.utils2 import safe_unicode, unique_id, safe_int, \
 
    time_to_datetime, AttributeDict
 
from kallithea.lib.compat import json
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Gist
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
class GistModel(BaseModel):
 
    cls = Gist
 

	
 
    def _get_gist(self, gist):
 
        """
 
        Helper method to get gist by ID, or gist_access_id as a fallback
 

	
 
        :param gist: GistID, gist_access_id, or Gist instance
 
        """
 
        return self._get_instance(Gist, gist, callback=Gist.get_by_access_id)
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s", rm_path)
 
        shutil.rmtree(rm_path)
 

	
 
    def _store_metadata(self, repo, gist_id, gist_access_id, user_id, gist_type,
 
                        gist_expires):
 
        """
 
        store metadata inside the gist, this can be later used for imports
 
        or gist identification
 
        """
 
        metadata = {
 
            'metadata_version': '1',
 
            'gist_db_id': gist_id,
 
            'gist_access_id': gist_access_id,
 
            'gist_owner_id': user_id,
 
            'gist_type': gist_type,
 
            'gist_expires': gist_expires,
 
            'gist_updated': time.time(),
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(json.dumps(metadata))
 

	
 
    def get_gist(self, gist):
 
        return self._get_gist(gist)
 

	
 
    def get_gist_files(self, gist_access_id, revision=None):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset(revision)
 
        return cs, [n for n in cs.get_node('/')]
 

	
 
    def create(self, description, owner, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
 
        """
 

	
 
        :param description: description of the gist
 
        :param owner: user who created this gist
 
        :param gist_mapping: mapping {filename:{'content':content},...}
 
        :param gist_type: type of gist private/public
 
        :param lifetime: in minutes, -1 == forever
 
        """
 
        owner = self._get_user(owner)
 
        gist_id = safe_unicode(unique_id(20))
 
        lifetime = safe_int(lifetime, -1)
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s',
 
                  time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever')
 
        #create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_id
 
        gist.gist_owner = owner.user_id
 
        gist.gist_expires = gist_expires
kallithea/model/repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository model for kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 5, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import shutil
 
import logging
 
import traceback
 
from datetime import datetime
 
from sqlalchemy.orm import subqueryload
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \
 
    remove_prefix, obfuscate_url_pw, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.hooks import log_delete_repository
 

	
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Repository, UserRepoToPerm, UserGroupRepoToPerm, \
 
    UserRepoGroupToPerm, UserGroupRepoGroupToPerm, User, Permission, \
 
    Statistics, UserGroup, Ui, RepoGroup, RepositoryField
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionAny, HasUserGroupPermissionAny
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.model.scm import UserGroupList
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(BaseModel):
 

	
 
    cls = Repository
 
    URL_SEPARATOR = Repository.url_sep()
 

	
 
    def _get_user_group(self, users_group):
 
        return self._get_instance(UserGroup, users_group,
 
                                  callback=UserGroup.get_by_group_name)
 

	
 
    def _get_repo_group(self, repo_group):
 
        return self._get_instance(RepoGroup, repo_group,
 
                                  callback=RepoGroup.get_by_group_name)
 

	
 
    def _create_default_perms(self, repository, private):
 
        # create default permission
 
        default = 'repository.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('repository.'):
 
                default = p.permission.permission_name
 
                break
 

	
 
        default_perm = 'repository.none' if private else default
 

	
 
        repo_to_perm = UserRepoToPerm()
 
        repo_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_to_perm.repository = repository
 
        repo_to_perm.user_id = def_user.user_id
 

	
 
        return repo_to_perm
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = self.sa.query(Ui).filter(Ui.ui_key == '/').one()
 
        return q.ui_value
 

	
 
    def get(self, repo_id, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_id == repo_id)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_id))
 
        return repo.scalar()
 

	
 
    def get_repo(self, repository):
 
        return self._get_repo(repository)
 

	
 
    def get_by_repo_name(self, repo_name, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_name == repo_name)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_name))
 
        return repo.scalar()
 

	
 
    def get_all_user_repos(self, user):
 
        """
 
        Gets all repositories that user have at least read access
 

	
 
        :param user:
 
        """
kallithea/model/scm.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.scm
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Scm model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import re
 
import time
 
import traceback
 
import logging
 
import cStringIO
 
import pkg_resources
 
from os.path import join as jn
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
import kallithea
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea import BACKENDS
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url,\
 
    _set_extras
 
from kallithea.lib.auth import HasRepoPermissionAny, HasRepoGroupPermissionAny,\
 
    HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAll
 
from kallithea.lib.utils import get_filesystem_repos, make_ui, \
 
    action_logger
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Repository, Ui, CacheInvalidation, \
 
    UserFollowing, UserLog, User, RepoGroup, PullRequest
 
from kallithea.lib.hooks import log_push_action
 
from kallithea.lib.exceptions import NonRelativePathError, IMCCommitError
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
 

	
 

	
 
class RepoTemp(object):
 
    def __init__(self, repo_id):
 
        self.repo_id = repo_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
 

	
 

	
 
class CachedRepoList(object):
 
    """
 
    Cached repo list. Uses super-fast in-memory cache after initialization.
 
    """
 

	
 
    def __init__(self, db_repo_list, repos_path, order_by=None, perm_set=None):
 
        self.db_repo_list = db_repo_list
 
        self.repos_path = repos_path
 
        self.order_by = order_by
 
        self.reversed = (order_by or '').startswith('-')
 
        if not perm_set:
 
            perm_set = ['repository.read', 'repository.write',
 
                        'repository.admin']
 
        self.perm_set = perm_set
 

	
 
    def __len__(self):
 
        return len(self.db_repo_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 

	
 
    def __iter__(self):
 
        # pre-propagated valid_cache_keys to save executing select statements
 
        # for each repo
 
        valid_cache_keys = CacheInvalidation.get_valid_cache_keys()
 

	
 
        for dbr in self.db_repo_list:
 
            scmr = dbr.scm_instance_cached(valid_cache_keys)
 
            # check permission at this level
 
            if not HasRepoPermissionAny(
 
                *self.perm_set)(dbr.repo_name, 'get repo check'):
 
                continue
 

	
 
            try:
 
                last_change = scmr.last_change
 
                tip = h.get_changeset_safe(scmr, 'tip')
 
            except Exception:
 
                log.error(
 
                    '%s this repository is present in database but it '
 
                    'cannot be created as an scm instance, org_exc:%s'
 
                    % (dbr.repo_name, traceback.format_exc())
 
                )
 
                continue
 

	
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
tests for api. run with::
 

	
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import random
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import Repository, User, Setting
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = 'test_user_group'
 
TEST_REPO_GROUP = 'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: json.loads(json.dumps(obj))
 

	
 

	
 
def crash(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname='first',
 
            lastname='last'
 
        )
 
        Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setUp(self):
 
        self.maxDiff = None
 
        make_user_group()
 
        make_repo_group()
kallithea/tests/functional/test_admin.py
Show inline comments
 
from __future__ import with_statement
 
import os
 
import csv
 
import datetime
 
from kallithea.tests import *
 
from kallithea.model.db import UserLog
 
from kallithea.model.meta import Session
 
from kallithea.lib.utils2 import safe_unicode
 

	
 
dn = os.path.dirname
 
FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
 

	
 

	
 
class TestAdminController(TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        def strptime(val):
 
            fmt = '%Y-%m-%d %H:%M:%S'
 
            if '.' not in val:
 
                return datetime.datetime.strptime(val, fmt)
 

	
 
            nofrag, frag = val.split(".")
 
            date = datetime.datetime.strptime(nofrag, fmt)
 

	
 
            frag = frag[:6]  # truncate to microseconds
 
            frag += (6 - len(frag)) * '0'  # add 0s
 
            return date.replace(microsecond=int(frag))
 

	
 
        with open(os.path.join(FIXTURES, 'journal_dump.csv')) as f:
 
            for row in csv.DictReader(f):
 
                ul = UserLog()
 
                for k, v in row.iteritems():
 
                    v = safe_unicode(v)
 
                    if k == 'action_date':
 
                        v = strptime(v)
 
                    if k in ['user_id', 'repository_id']:
 
                        # nullable due to FK problems
 
                        v = None
 
                    setattr(ul, k, v)
 
                Session().add(ul)
 
            Session().commit()
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index'))
 
        response.mustcontain('Admin Journal')
 

	
 
    def test_filter_all_entries(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',))
 
        response.mustcontain('2034 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:xxx'))
 
        response.mustcontain('3 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:XxX'))
 
        response.mustcontain('3 Entries')
 

	
 
    def test_filter_journal_filter_wildcard_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:*test*'))
 
        response.mustcontain('862 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:test*'))
 
        response.mustcontain('257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:Test*'))
 
        response.mustcontain('257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_and_user(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:test* AND username:demo'))
 
        response.mustcontain('130 Entries')
 

	
kallithea/tests/functional/test_login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from __future__ import with_statement
 
import re
 
import time
 

	
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.lib.auth import check_password
 
from kallithea.lib import helpers as h
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model import validators
 
from kallithea.model.db import User, Notification
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(TestController):
 
    def setUp(self):
 
        self.remove_all_notifications()
 
        self.assertEqual(Notification.query().all(), [])
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertEqual(response.status, '200 OK')
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assert_authenticated_user(response, TEST_USER_ADMIN_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Users Administration')
 

	
 
    def test_login_do_not_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': False})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertFalse(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r has expiration date, but should be a session cookie' % cookie)
 

	
 
    def test_login_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': True})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertTrue(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r should have expiration date, but is a session cookie' % cookie)
 

	
 
    def test_logout(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(url(controller='login', action='index'))
 
        response = response.follow()
 
        self.assertIn('authuser', response.session)
 

	
 
        response.click('Log Out')
 

	
kallithea/tests/models/test_diff_parsers.py
Show inline comments
 
from __future__ import with_statement
 
from kallithea.tests import *
 
from kallithea.lib.diffs import DiffProcessor, NEW_FILENODE, DEL_FILENODE, \
 
    MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE, COPIED_FILENODE
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 

	
 
DIFF_FIXTURES = {
 
    'hg_diff_add_single_binary_file.diff': [
 
        ('US Warszawa.jpg', 'A',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {NEW_FILENODE: 'new file 100755',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 
    'hg_diff_mod_single_binary_file.diff': [
 
        ('US Warszawa.jpg', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {MOD_FILENODE: 'modified file',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 

	
 
    'hg_diff_mod_single_file_and_rename_and_chmod.diff': [
 
        ('README', 'R',
 
         {'added': 3,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {RENAMED_FILENODE: 'file renamed from README.rst to README',
 
                  CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
 
    ],
 
    'hg_diff_mod_file_and_rename.diff': [
 
        ('README.rst', 'R',
 
         {'added': 3,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {RENAMED_FILENODE: 'file renamed from README to README.rst'}}),
 
    ],
 
    'hg_diff_del_single_binary_file.diff': [
 
        ('US Warszawa.jpg', 'D',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {DEL_FILENODE: 'deleted file',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 
    'hg_diff_chmod_and_mod_single_binary_file.diff': [
 
        ('gravatar.png', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 
    'hg_diff_chmod.diff': [
 
        ('file', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
 
    ],
 
    'hg_diff_rename_file.diff': [
 
        ('file_renamed', 'R',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {RENAMED_FILENODE: 'file renamed from file to file_renamed'}}),
 
    ],
 
    'hg_diff_rename_and_chmod_file.diff': [
 
        ('README', 'R',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
 
                  RENAMED_FILENODE: 'file renamed from README.rst to README'}}),
 
    ],
 
    'hg_diff_binary_and_normal.diff': [
 
        ('img/baseline-10px.png', 'A',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {NEW_FILENODE: 'new file 100644',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
        ('img/baseline-20px.png', 'D',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {DEL_FILENODE: 'deleted file',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
        ('index.html', 'M',
 
         {'added': 3,
 
          'deleted': 2,
 
          'binary': False,
kallithea/tests/other/test_libs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.other.test_libs
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import datetime
 
import hashlib
 
import mock
 
from kallithea.tests import *
 
from kallithea.lib.utils2 import AttributeDict
 
from kallithea.model.db import Repository
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
 
     '%s://domain.org' % proto),
 
    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
 
                                                '8080'],
 
     '%s://domain.org:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
 
     '%s://domain.org' % proto),
 
    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
 
                                                '8080'],
 
     '%s://domain.org:8080' % proto),
 
]
 

	
 

	
 
class TestLibs(BaseTestCase):
 

	
 
    @parameterized.expand(TEST_URLS)
 
    def test_uri_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import uri_filter
 
        self.assertEqual(uri_filter(test_url), expected)
 

	
 
    @parameterized.expand(TEST_URLS)
 
    def test_credentials_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import credentials_filter
 
        self.assertEqual(credentials_filter(test_url), expected_creds)
 

	
 
    @parameterized.expand([('t', True),
 
                           ('true', True),
 
                           ('y', True),
 
                           ('yes', True),
 
                           ('on', True),
 
                           ('1', True),
 
                           ('Y', True),
 
                           ('yeS', True),
 
                           ('Y', True),
 
                           ('TRUE', True),
 
                           ('T', True),
 
                           ('False', False),
 
                           ('F', False),
 
                           ('FALSE', False),
 
                           ('0', False),
 
                           ('-1', False),
 
                           ('', False)
 
    ])
 
    def test_str2bool(self, str_bool, expected):
 
        from kallithea.lib.utils2 import str2bool
 
        self.assertEqual(str2bool(str_bool), expected)
 

	
 
    def test_mention_extractor(self):
 
        from kallithea.lib.utils2 import extract_mentioned_users
 
        sample = (
 
            "@first hi there @world here's my email username@email.com "
 
            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
 
            "@UPPER    @cAmEL @2one_more22 @john please see this http://org.pl "
 
            "@marian.user just do it @marco-polo and next extract @marco_polo "
 
            "user.dot  hej ! not-needed maril@domain.org"
 
        )
 

	
 
        s = sorted([
 
            '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
 
            'marian.user', 'marco-polo', 'marco_polo', 'world'], key=lambda k: k.lower())
 
        self.assertEqual(s, extract_mentioned_users(sample))
 

	
 
    @parameterized.expand([
 
        (dict(), u'just now'),
 
        (dict(seconds= -1), u'1 second ago'),
 
        (dict(seconds= -60 * 2), u'2 minutes ago'),
 
        (dict(hours= -1), u'1 hour ago'),
kallithea/tests/vcs/test_archives.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import tarfile
 
import zipfile
 
import datetime
 
import tempfile
 
import StringIO
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class ArchivesTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('%d/file_%d.txt' % (x, x),
 
                        content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_archive_zip(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='zip', prefix='repo')
 
        out = zipfile.ZipFile(path)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            decompressed = StringIO.StringIO()
 
            decompressed.write(out.read('repo/' + node_path))
 
            self.assertEqual(
 
                decompressed.getvalue(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_tgz(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='tgz', prefix='repo')
 
        outdir = tempfile.mkdtemp()
 

	
 
        outfile = tarfile.open(path, 'r|gz')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            self.assertEqual(
 
                open(os.path.join(outdir, 'repo/' + node_path)).read(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_tbz2(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'w+b') as f:
 
            self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo')
 
        outdir = tempfile.mkdtemp()
 

	
 
        outfile = tarfile.open(path, 'r|bz2')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            self.assertEqual(
 
                open(os.path.join(outdir, 'repo/' + node_path)).read(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_default_stream(self):
 
        tmppath = tempfile.mkstemp()[1]
 
        with open(tmppath, 'w') as stream:
 
            self.tip.fill_archive(stream=stream)
 
        mystream = StringIO.StringIO()
 
        self.tip.fill_archive(stream=mystream)
 
        mystream.seek(0)
 
        with open(tmppath, 'r') as f:
 
            self.assertEqual(f.read(), mystream.read())
 

	
 
    def test_archive_wrong_kind(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(kind='wrong kind')
 

	
 
    def test_archive_empty_prefix(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(prefix='')
 

	
 
    def test_archive_prefix_with_leading_slash(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(prefix='/any')
 

	
kallithea/tests/vcs/test_branches.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.lib import vcs
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.nodes import FileNode
 

	
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 

	
 
class BranchesTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': 'Changes...',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('some/new.txt', content='news...'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'Foobar I'),
 
                ],
 
                'removed': [],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.date, datetime.datetime(2010, 1, 1, 21))
 

	
 
    def test_new_branch(self):
 
        # This check must not be removed to ensure the 'branches' LazyProperty
 
        # gets hit *before* the new 'foobar' branch got created:
 
        self.assertFalse('foobar' in self.repo.branches)
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertTrue('foobar' in self.repo.branches)
 
        self.assertEqual(foobar_tip.branch, 'foobar')
 

	
 
    def test_new_head(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        self.assertEqual(newest_tip.branch,
 
            self.backend_class.DEFAULT_BRANCH_NAME)
 

	
 
    def test_branch_with_slash_in_name(self):
 
        self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
 
        self.imc.commit(u'Branch with a slash!', author=u'joe',
 
            branch='issue/123')
 
        self.assertTrue('issue/123' in self.repo.branches)
 

	
 
    def test_branch_with_slash_in_name_and_similar_without(self):
 
        self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
 
        self.imc.commit(u'Branch with a slash!', author=u'joe',
kallithea/tests/vcs/test_changesets.py
Show inline comments
 
# encoding: utf8
 
from __future__ import with_statement
 

	
 
import time
 
import datetime
 
from kallithea.lib import vcs
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 
from kallithea.lib.vcs.backends.base import BaseChangeset
 
from kallithea.lib.vcs.nodes import (
 
    FileNode, AddedFileNodesGenerator,
 
    ChangedFileNodesGenerator, RemovedFileNodesGenerator
 
)
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError,
 
    RepositoryError, EmptyRepositoryError
 
)
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.conf import get_new_dir
 

	
 

	
 
class TestBaseChangeset(unittest.TestCase):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.id = 'ID'
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        self.assertEqual(changeset.as_dict(), {
 
            'id': 'ID',
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
                'name': 'Joe Doe',
 
                'email': 'joe.doe@example.com',
 
            },
 
            'added': ['foo/bar/baz', 'foobar'],
 
            'changed': [],
 
            'removed': [],
 
        })
 

	
 
class _ChangesetsWithCommitsTestCaseixin(_BackendTestMixin):
 
    recreate_repo_per_test = True
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_new_branch(self):
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertTrue('foobar' in self.repo.branches)
 
        self.assertEqual(foobar_tip.branch, 'foobar')
 
        # 'foobar' should be the only branch that contains the new commit
 
        self.assertNotEqual(*self.repo.branches.values())
 

	
 
    def test_new_head_in_default_branch(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
kallithea/tests/vcs/test_filenodes_unicode_path.py
Show inline comments
 
# encoding: utf8
 

	
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.test_inmemchangesets import BackendBaseTestCase
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 

	
 
class FileNodeUnicodePathTestsMixin(object):
 

	
 
    fname = 'ąśðąęłąć.txt'
 
    ufname = (fname).decode('utf-8')
 

	
 
    def get_commits(self):
 
        self.nodes = [
 
            FileNode(self.fname, content='Foobar'),
 
        ]
 

	
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': self.nodes,
 
            },
 
        ]
 
        return commits
 

	
 
    def test_filenode_path(self):
 
        node = self.tip.get_node(self.fname)
 
        unode = self.tip.get_node(self.ufname)
 
        self.assertEqual(node, unode)
 

	
 

	
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s file node unicode path test' % alias).title()
 
        .split())
 
    bases = (FileNodeUnicodePathTestsMixin, BackendBaseTestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_getitem.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class GetitemTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getitem__last_item_is_tip(self):
 
        self.assertEqual(self.repo[-1], self.repo.get_changeset())
 

	
 
    def test__getitem__returns_correct_items(self):
 
        changesets = [self.repo[x] for x in xrange(len(self.repo.revisions))]
 
        self.assertEqual(changesets, list(self.repo.get_changesets()))
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s getitem test' % alias).title().split())
 
    bases = (GetitemTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_getslice.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class GetsliceTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getslice__last_item_is_tip(self):
 
        self.assertEqual(list(self.repo[-1:])[0], self.repo.get_changeset())
 

	
 
    def test__getslice__respects_start_index(self):
 
        self.assertEqual(list(self.repo[2:]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[2:]])
 

	
 
    def test__getslice__respects_negative_start_index(self):
 
        self.assertEqual(list(self.repo[-2:]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[-2:]])
 

	
 
    def test__getslice__respects_end_index(self):
 
        self.assertEqual(list(self.repo[:2]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[:2]])
 

	
 
    def test__getslice__respects_negative_end_index(self):
 
        self.assertEqual(list(self.repo[:-2]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[:-2]])
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s getslice test' % alias).title().split())
 
    bases = (GetsliceTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_git.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import mock
 
import datetime
 
import urllib2
 
from kallithea.lib.vcs.backends.git import GitRepository, GitChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.model.scm import ScmModel
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
 

	
 

	
 
class GitRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_GIT_REPO_CLONE):
 
            self.fail('Cannot test git clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_GIT_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
 

	
 
    def test_git_cmd_injection(self):
 
        repo_inject_path = TEST_GIT_REPO + '; echo "Cake";'
 
        with self.assertRaises(urllib2.URLError):
 
            # Should fail because URL will contain the parts after ; too
 
            urlerror_fail_repo = GitRepository(get_new_dir('injection-repo'), src_url=repo_inject_path, update_after_clone=True, create=True)
 

	
 
        with self.assertRaises(RepositoryError):
 
            # Should fail on direct clone call, which as of this writing does not happen outside of class
 
            clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
 
            clone_fail_repo.clone(repo_inject_path, update_after_clone=True,)
 

	
 
        # Verify correct quoting of evil characters that should work on posix file systems
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            tricky_path = get_new_dir("tricky-path-repo-$'`")
 
        else:
 
            tricky_path = get_new_dir("tricky-path-repo-$'\"`")
 
        successfully_cloned = GitRepository(tricky_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        self.assertFalse(successfully_cloned._repo.bare)
 

	
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$'`")
 
        else:
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$'\"`")
 
        successfully_cloned2 = GitRepository(tricky_path_2, src_url=tricky_path, bare=True, create=True)
 
        # Repo should have been created and thus used correct quoting for clone
 
        self.assertTrue(successfully_cloned2._repo.bare)
 

	
 
        # Should pass because URL has been properly quoted
 
        successfully_cloned.pull(tricky_path_2)
 
        successfully_cloned2.fetch(tricky_path)
 

	
 
    def test_repo_create_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        repo = GitRepository(repo_path, src_url=None, bare=True, create=True)
 
        # Repo should have been created
 
        self.assertTrue(repo._repo.bare)
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = GitRepository(TEST_GIT_REPO)
 
        repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
 
            src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        successfully_cloned = GitRepository(repo_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        self.assertFalse(successfully_cloned._repo.bare)
 

	
 
        successfully_cloned.pull(TEST_GIT_REPO)
 
        self.repo.fetch(repo_path)
 

	
 
    def test_repo_clone_without_create(self):
 
        self.assertRaises(RepositoryError, GitRepository,
 
            TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
kallithea/tests/vcs/test_hg.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
from kallithea.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, NodeState
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, \
 
    TEST_HG_REPO_PULL
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
# Use only clean mercurial's ui
 
from kallithea.lib.vcs.utils.hgcompat import mercurial
 
mercurial.scmutil.rcpath()
 
if mercurial.scmutil._rcpath:
 
    mercurial.scmutil._rcpath = mercurial.scmutil._rcpath[:1]
 

	
 

	
 
class MercurialRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_HG_REPO_CLONE):
 
            self.fail('Cannot test mercurial clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_HG_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path)
 

	
 
    def test_unicode_path_repo(self):
 
        self.assertRaises(VCSError,lambda:MercurialRepository(u'iShouldFail'))
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 

	
 
        #check if current workdir was updated
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_w_update',
 
                                                    'MANIFEST.in')), True,)
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
 
            src_url=TEST_HG_REPO, update_after_clone=False)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_wo_update',
 
                                                    'MANIFEST.in')), False,)
 

	
 
    def test_pull(self):
 
        if os.path.exists(TEST_HG_REPO_PULL):
 
            self.fail('Cannot test mercurial pull command as location %s '
 
                      'already exists. You should manually remove it first'
 
                      % TEST_HG_REPO_PULL)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
 
        self.assertTrue(len(self.repo.revisions) > len(repo_new.revisions))
 

	
 
        repo_new.pull(self.repo.path)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL)
 
        self.assertTrue(len(self.repo.revisions) == len(repo_new.revisions))
 

	
 
    def test_revisions(self):
 
        # there are 21 revisions at bitbucket now
 
        # so we can assume they would be available from now on
 
        subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                 '6cba7170863a2411822803fa77a0a264f1310b35',
 
                 '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                 '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                 '6fff84722075f1607a30f436523403845f84cd9e',
 
                 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                 'be90031137367893f1c406e0a8683010fd115b79',
 
                 'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                 '84478366594b424af694a6c784cb991a16b87c21',
 
                 '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
kallithea/tests/vcs/test_inmemchangesets.py
Show inline comments
 
# encoding: utf8
 
"""
 
Tests so called "in memory changesets" commit API of vcs.
 
"""
 
from __future__ import with_statement
 

	
 
import time
 
import datetime
 

	
 
from kallithea.lib import vcs
 
from kallithea.tests.vcs.conf import SCM_TESTS, get_new_dir
 
from kallithea.lib.vcs.exceptions import EmptyRepositoryError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyAddedError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyExistsError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyRemovedError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyChangedError
 
from kallithea.lib.vcs.exceptions import NodeDoesNotExistError
 
from kallithea.lib.vcs.exceptions import NodeNotChangedError
 
from kallithea.lib.vcs.nodes import DirNode
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.utils import safe_unicode
 

	
 

	
 
class InMemoryChangesetTestMixin(object):
 
    """
 
    This is a backend independent test case class which should be created
 
    with ``type`` method.
 

	
 
    It is required to set following attributes at subclass:
 

	
 
    - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
 
    - ``repo_path``: path to the repository which would be created for set of
 
      tests
 
    """
 

	
 
    def get_backend(self):
 
        return vcs.get_backend(self.backend_alias)
 

	
 
    def setUp(self):
 
        Backend = self.get_backend()
 
        self.repo_path = get_new_dir(str(time.time()))
 
        self.repo = Backend(self.repo_path, create=True)
 
        self.imc = self.repo.in_memory_changeset
 
        self.nodes = [
 
            FileNode('foobar', content='Foo & bar'),
 
            FileNode('foobar2', content='Foo & bar, doubled!'),
 
            FileNode('foo bar with spaces', content=''),
 
            FileNode('foo/bar/baz', content='Inside'),
 
            FileNode('foo/bar/file.bin', content='\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'),
 
        ]
 

	
 
    def test_add(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_in_bulk(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        self.imc.add(*to_add)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_actually_adds_all_nodes_at_second_commit_too(self):
 
        self.imc.add(FileNode('foo/bar/image.png', content='\0'))
 
        self.imc.add(FileNode('foo/README.txt', content='readme!'))
 
        changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
 
        self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
 
        self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
 
        self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
 
        self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
 

	
 
        # commit some more files again
kallithea/tests/vcs/test_nodes.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import stat
 
from kallithea.lib.vcs.nodes import DirNode
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.nodes import Node
 
from kallithea.lib.vcs.nodes import NodeError
 
from kallithea.lib.vcs.nodes import NodeKind
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class NodeBasicTest(unittest.TestCase):
 

	
 
    def test_init(self):
 
        """
 
        Cannot innitialize Node objects with path with slash at the beginning.
 
        """
 
        wrong_paths = (
 
            '/foo',
 
            '/foo/bar'
 
        )
 
        for path in wrong_paths:
 
            self.assertRaises(NodeError, Node, path, NodeKind.FILE)
 

	
 
        wrong_paths = (
 
            '/foo/',
 
            '/foo/bar/'
 
        )
 
        for path in wrong_paths:
 
            self.assertRaises(NodeError, Node, path, NodeKind.DIR)
 

	
 
    def test_name(self):
 
        node = Node('', NodeKind.DIR)
 
        self.assertEqual(node.name, '')
 

	
 
        node = Node('path', NodeKind.FILE)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('path/', NodeKind.DIR)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('some/path', NodeKind.FILE)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('some/path/', NodeKind.DIR)
 
        self.assertEqual(node.name, 'path')
 

	
 
    def test_root_node(self):
 
        self.assertRaises(NodeError, Node, '', NodeKind.FILE)
 

	
 
    def test_kind_setter(self):
 
        node = Node('', NodeKind.DIR)
 
        self.assertRaises(NodeError, setattr, node, 'kind', NodeKind.FILE)
 

	
 
    def _test_parent_path(self, node_path, expected_parent_path):
 
        """
 
        Tests if node's parent path are properly computed.
 
        """
 
        node = Node(node_path, NodeKind.DIR)
 
        parent_path = node.get_parent_path()
 
        self.assertTrue(parent_path.endswith('/') or \
 
            node.is_root() and parent_path == '')
 
        self.assertEqual(parent_path, expected_parent_path,
 
            "Node's path is %r and parent path is %r but should be %r"
 
            % (node.path, parent_path, expected_parent_path))
 

	
 
    def test_parent_path(self):
 
        test_paths = (
 
            # (node_path, expected_parent_path)
 
            ('', ''),
 
            ('some/path/', 'some/'),
 
            ('some/longer/path/', 'some/longer/'),
 
        )
 
        for node_path, expected_parent_path in test_paths:
 
            self._test_parent_path(node_path, expected_parent_path)
 

	
 
    '''
 
    def _test_trailing_slash(self, path):
 
        if not path.endswith('/'):
 
            self.fail("Trailing slash tests needs paths to end with slash")
 
        for kind in NodeKind.FILE, NodeKind.DIR:
 
            self.assertRaises(NodeError, Node, path=path, kind=kind)
 

	
 
    def test_trailing_slash(self):
 
        for path in ('/', 'foo/', 'foo/bar/', 'foo/bar/biz/'):
 
            self._test_trailing_slash(path)
 
    '''
 

	
 
    def test_is_file(self):
 
        node = Node('any', NodeKind.FILE)
 
        self.assertTrue(node.is_file())
 

	
 
        node = FileNode('any')
 
        self.assertTrue(node.is_file())
 
        self.assertRaises(AttributeError, getattr, node, 'nodes')
 

	
 
    def test_is_dir(self):
kallithea/tests/vcs/test_repository.py
Show inline comments
 
from __future__ import with_statement
 
import datetime
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.tests.vcs.conf import TEST_USER_CONFIG_FILE
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 

	
 

	
 
class RepositoryBaseTest(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return super(RepositoryBaseTest, cls)._get_commits()[:1]
 

	
 
    def test_get_config_value(self):
 
        self.assertEqual(self.repo.get_config_value('universal', 'foo',
 
            TEST_USER_CONFIG_FILE), 'bar')
 

	
 
    def test_get_config_value_defaults_to_None(self):
 
        self.assertEqual(self.repo.get_config_value('universal', 'nonexist',
 
            TEST_USER_CONFIG_FILE), None)
 

	
 
    def test_get_user_name(self):
 
        self.assertEqual(self.repo.get_user_name(TEST_USER_CONFIG_FILE),
 
            'Foo Bar')
 

	
 
    def test_get_user_email(self):
 
        self.assertEqual(self.repo.get_user_email(TEST_USER_CONFIG_FILE),
 
            'foo.bar@example.com')
 

	
 
    def test_repo_equality(self):
 
        self.assertTrue(self.repo == self.repo)
 

	
 
    def test_repo_equality_broken_object(self):
 
        import copy
 
        _repo = copy.copy(self.repo)
 
        delattr(_repo, 'path')
 
        self.assertTrue(self.repo != _repo)
 

	
 
    def test_repo_equality_other_object(self):
 
        class dummy(object):
 
            path = self.repo.path
 
        self.assertTrue(self.repo != dummy())
 

	
 

	
 
class RepositoryGetDiffTest(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='foobar'),
 
                    FileNode('foobar2', content='foobar2'),
 
                ],
 
            },
 
            {
 
                'message': 'Changed foobar, added foobar3',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('foobar3', content='foobar3'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'FOOBAR'),
 
                ],
 
            },
 
            {
 
                'message': 'Removed foobar, changed foobar3',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'changed': [
 
                    FileNode('foobar3', content='FOOBAR\nFOOBAR\nFOOBAR\n'),
 
                ],
 
                'removed': [FileNode('foobar')],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_raise_for_wrong(self):
 
        with self.assertRaises(ChangesetDoesNotExistError):
 
            self.repo.get_diff('a' * 40, 'b' * 40)
 

	
 

	
 
class GitRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase):
 
    backend_alias = 'git'
 

	
 
    def test_initial_commit_diff(self):
 
        initial_rev = self.repo.revisions[0]
 
        self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar
 
new file mode 100644
kallithea/tests/vcs/test_tags.py
Show inline comments
 
from __future__ import with_statement
 

	
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.exceptions import TagAlreadyExistError
 
from kallithea.lib.vcs.exceptions import TagDoesNotExistError
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TagsTestCaseMixin(_BackendTestMixin):
 

	
 
    def test_new_tag(self):
 
        tip = self.repo.get_changeset()
 
        tagsize = len(self.repo.tags)
 
        tag = self.repo.tag('last-commit', 'joe', tip.raw_id)
 

	
 
        self.assertEqual(len(self.repo.tags), tagsize + 1)
 
        for top, dirs, files in tip.walk():
 
            self.assertEqual(top, tag.get_node(top.path))
 

	
 
    def test_tag_already_exist(self):
 
        tip = self.repo.get_changeset()
 
        self.repo.tag('last-commit', 'joe', tip.raw_id)
 

	
 
        self.assertRaises(TagAlreadyExistError,
 
            self.repo.tag, 'last-commit', 'joe', tip.raw_id)
 

	
 
        chset = self.repo.get_changeset(0)
 
        self.assertRaises(TagAlreadyExistError,
 
            self.repo.tag, 'last-commit', 'jane', chset.raw_id)
 

	
 
    def test_remove_tag(self):
 
        tip = self.repo.get_changeset()
 
        self.repo.tag('last-commit', 'joe', tip.raw_id)
 
        tagsize = len(self.repo.tags)
 

	
 
        self.repo.remove_tag('last-commit', user='evil joe')
 
        self.assertEqual(len(self.repo.tags), tagsize - 1)
 

	
 
    def test_remove_tag_which_does_not_exist(self):
 
        self.assertRaises(TagDoesNotExistError,
 
            self.repo.remove_tag, 'last-commit', user='evil joe')
 

	
 
    def test_name_with_slash(self):
 
        self.repo.tag('19/10/11', 'joe')
 
        self.assertTrue('19/10/11' in self.repo.tags)
 
        self.repo.tag('11', 'joe')
 
        self.assertTrue('11' in self.repo.tags)
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s tags test' % alias).title().split())
 
    bases = (TagsTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_utils.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import mock
 
import time
 
import shutil
 
import tempfile
 
import datetime
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.utils.paths import get_dirs_for_path
 
from kallithea.lib.vcs.utils.helpers import get_dict_for_attrs
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.helpers import get_scms_for_path
 
from kallithea.lib.vcs.utils.helpers import get_total_seconds
 
from kallithea.lib.vcs.utils.helpers import parse_changesets
 
from kallithea.lib.vcs.utils.helpers import parse_datetime
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.vcs.utils.paths import get_user_home
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
 

	
 

	
 
class PathsTest(unittest.TestCase):
 

	
 
    def _test_get_dirs_for_path(self, path, expected):
 
        """
 
        Tests if get_dirs_for_path returns same as expected.
 
        """
 
        expected = sorted(expected)
 
        result = sorted(get_dirs_for_path(path))
 
        self.assertEqual(result, expected,
 
            msg="%s != %s which was expected result for path %s"
 
            % (result, expected, path))
 

	
 
    def test_get_dirs_for_path(self):
 
        path = 'foo/bar/baz/file'
 
        paths_and_results = (
 
            ('foo/bar/baz/file', ['foo', 'foo/bar', 'foo/bar/baz']),
 
            ('foo/bar/', ['foo', 'foo/bar']),
 
            ('foo/bar', ['foo']),
 
        )
 
        for path, expected in paths_and_results:
 
            self._test_get_dirs_for_path(path, expected)
 

	
 

	
 
    def test_get_scm(self):
 
        self.assertEqual(('hg', TEST_HG_REPO), get_scm(TEST_HG_REPO))
 
        self.assertEqual(('git', TEST_GIT_REPO), get_scm(TEST_GIT_REPO))
 

	
 
    def test_get_two_scms_for_path(self):
 
        multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo-2')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        self.assertRaises(VCSError, get_scm, multialias_repo_path)
 

	
 
    def test_get_scm_error_path(self):
 
        self.assertRaises(VCSError, get_scm, 'err')
 

	
 
    def test_get_scms_for_path(self):
 
        dirpath = tempfile.gettempdir()
 
        new = os.path.join(dirpath, 'vcs-scms-for-path-%s' % time.time())
 
        os.mkdir(new)
 
        self.assertEqual(get_scms_for_path(new), [])
 

	
 
        os.mkdir(os.path.join(new, '.tux'))
 
        self.assertEqual(get_scms_for_path(new), [])
 

	
 
        os.mkdir(os.path.join(new, '.git'))
 
        self.assertEqual(set(get_scms_for_path(new)), set(['git']))
 

	
 
        os.mkdir(os.path.join(new, '.hg'))
 
        self.assertEqual(set(get_scms_for_path(new)), set(['git', 'hg']))
 

	
 

	
 
class TestParseChangesets(unittest.TestCase):
 

	
 
    def test_main_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('123456'), {
 
            'start': None,
 
            'main': '123456',
 
            'end': None,
 
        })
 

	
 
    def test_start_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('aaabbb..'), {
 
            'start': 'aaabbb',
 
            'main': None,
 
            'end': None,
 
        })
 

	
 
    def test_end_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('..cccddd'), {
 
            'start': None,
kallithea/tests/vcs/test_utils_filesize.py
Show inline comments
 
from __future__ import with_statement
 

	
 
from kallithea.lib.vcs.utils.filesize import filesizeformat
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TestFilesizeformat(unittest.TestCase):
 

	
 
    def test_bytes(self):
 
        self.assertEqual(filesizeformat(10), '10 B')
 

	
 
    def test_kilobytes(self):
 
        self.assertEqual(filesizeformat(1024 * 2), '2 KB')
 

	
 
    def test_megabytes(self):
 
        self.assertEqual(filesizeformat(1024 * 1024 * 2.3), '2.3 MB')
 

	
 
    def test_gigabytes(self):
 
        self.assertEqual(filesizeformat(1024 * 1024 * 1024 * 12.92), '12.92 GB')
 

	
 
    def test_that_function_respects_sep_paramtere(self):
 
        self.assertEqual(filesizeformat(1, ''), '1B')
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_vcs.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import shutil
 

	
 
from kallithea.lib.vcs import VCSError, get_repo, get_backend
 
from kallithea.lib.vcs.backends.hg import MercurialRepository
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
 

	
 

	
 

	
 
class VCSTest(unittest.TestCase):
 
    """
 
    Tests for main module's methods.
 
    """
 

	
 
    def test_get_backend(self):
 
        hg = get_backend('hg')
 
        self.assertEqual(hg, MercurialRepository)
 

	
 
    def test_alias_detect_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 
        self.assertEqual('hg',repo.alias)
 

	
 
    def test_alias_detect_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 
        self.assertEqual('git',repo.alias)
 

	
 
    def test_wrong_alias(self):
 
        alias = 'wrong_alias'
 
        self.assertRaises(VCSError, get_backend, alias)
 

	
 
    def test_get_repo(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path, alias).__class__)
 
        self.assertEqual(repo.path, get_repo(path, alias).path)
 

	
 
    def test_get_repo_autoalias_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path).__class__)
 
        self.assertEqual(repo.path, get_repo(path).path)
 

	
 
    def test_get_repo_autoalias_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path).__class__)
 
        self.assertEqual(repo.path, get_repo(path).path)
 

	
 

	
 
    def test_get_repo_err(self):
 
        blank_repo_path = os.path.join(TEST_TMP_PATH, 'blank-error-repo')
 
        if os.path.isdir(blank_repo_path):
 
            shutil.rmtree(blank_repo_path)
 

	
 
        os.mkdir(blank_repo_path)
 
        self.assertRaises(VCSError, get_repo, blank_repo_path)
 
        self.assertRaises(VCSError, get_repo, blank_repo_path + 'non_existing')
 

	
 
    def test_get_repo_multialias(self):
 
        multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        os.mkdir(os.path.join(multialias_repo_path, '.git'))
 
        os.mkdir(os.path.join(multialias_repo_path, '.hg'))
 
        self.assertRaises(VCSError, get_repo, multialias_repo_path)
kallithea/tests/vcs/test_workdirs.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 

	
 
class WorkdirTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': u'Initial commit',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': u'Changes...',
 
                'author': u'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('some/new.txt', content='news...'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'Foobar I'),
 
                ],
 
                'removed': [],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_get_branch_for_default_branch(self):
 
        self.assertEqual(self.repo.workdir.get_branch(),
 
            self.repo.DEFAULT_BRANCH_NAME)
 

	
 
    def test_get_branch_after_adding_one(self):
 
        self.imc.add(FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertEqual(self.repo.workdir.get_branch(), self.default_branch)
 

	
 
    def test_get_changeset(self):
 
        old_head = self.repo.get_changeset()
 
        self.imc.add(FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        head = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertEqual(self.repo.workdir.get_branch(), self.default_branch)
 
        self.repo.workdir.checkout_branch('foobar')
 
        self.assertEqual(self.repo.workdir.get_changeset(), head)
 

	
 
        # Make sure that old head is still there after update to defualt branch
 
        self.repo.workdir.checkout_branch(self.default_branch)
 
        self.assertEqual(self.repo.workdir.get_changeset(), old_head)
 

	
 
    def test_checkout_branch(self):
 
        from kallithea.lib.vcs.exceptions import BranchDoesNotExistError
 
        # first, 'foobranch' does not exist.
 
        self.assertRaises(BranchDoesNotExistError, self.repo.workdir.checkout_branch,
 
                          branch='foobranch')
 
        # create new branch 'foobranch'.
 
        self.imc.add(FileNode('file1', content='blah'))
 
        self.imc.commit(message=u'asd', author=u'john', branch='foobranch')
 
        # go back to the default branch
 
        self.repo.workdir.checkout_branch()
 
        self.assertEqual(self.repo.workdir.get_branch(), self.backend_class.DEFAULT_BRANCH_NAME)
 
        # checkout 'foobranch'
 
        self.repo.workdir.checkout_branch('foobranch')
 
        self.assertEqual(self.repo.workdir.get_branch(), 'foobranch')
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s branch test' % alias).title().split())
 
    bases = (WorkdirTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
0 comments (0 inline, 0 general)