Changeset - 431689d7f37d
[Not reviewed]
default
0 41 0
Søren Løvborg - 10 years ago 2015-08-31 17:42:57
sorenl@unity3d.com
remove vestiges of Python 2.5 support

We only support Python 2.6 and 2.7; hence we do not need to import
with-statement support from __future__.
41 files changed with 0 insertions and 41 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_api
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Api CLI client for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 3, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import sys
 
import argparse
 

	
 
from kallithea.bin.base import json, api_call, RcConf, FORMAT_JSON, FORMAT_PRETTY
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-api [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] "
 
      "METHOD <key:val> <key2:val> ...\n"
 
      "Create config file: kallithea-api --apikey=<key> --apihost=http://your.kallithea.server --save-config"
 
    )
 

	
 
    parser = argparse.ArgumentParser(description='Kallithea API cli',
 
                                     usage=usage)
 

	
 
    ## config
 
    group = parser.add_argument_group('config')
 
    group.add_argument('--apikey', help='api access key')
 
    group.add_argument('--apihost', help='api host')
 
    group.add_argument('--config', help='config file')
 
    group.add_argument('--save-config', action='store_true', help='save the given config into a file')
 

	
 
    group = parser.add_argument_group('API')
 
    group.add_argument('method', metavar='METHOD', nargs='?', type=str, default=None,
 
            help='API method name to call followed by key:value attributes',
 
    )
 
    group.add_argument('--format', dest='format', type=str,
 
            help='output format default: `%s` can '
 
                 'be also `%s`' % (FORMAT_PRETTY, FORMAT_JSON),
 
            default=FORMAT_PRETTY
 
    )
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def main(argv=None):
 
    """
 
    Main execution function for cli
 

	
 
    :param argv:
 
    """
 
    if argv is None:
 
        argv = sys.argv
 

	
 
    conf = None
 
    parser, args, other = argparser(argv)
 

	
 
    api_credentials_given = (args.apikey and args.apihost)
 
    if args.save_config:
 
        if not api_credentials_given:
 
            raise parser.error('--save-config requires --apikey and --apihost')
 
        conf = RcConf(config_location=args.config,
 
                      autocreate=True, config={'apikey': args.apikey,
 
                                               'apihost': args.apihost})
 
        sys.exit()
 

	
 
    if not conf:
 
        conf = RcConf(config_location=args.config, autoload=True)
 
        if not conf:
 
            if not api_credentials_given:
 
                parser.error('Could not find config file and missing '
 
                             '--apikey or --apihost in params')
 

	
 
    apikey = args.apikey or conf['apikey']
 
    apihost = args.apihost or conf['apihost']
 
    method = args.method
 

	
 
    # if we don't have method here it's an error
 
    if not method:
 
        parser.error('Please specify method name')
 

	
 
    try:
 
        margs = dict(map(lambda s: s.split(':', 1), other))
 
    except ValueError:
 
        sys.stderr.write('Error parsing arguments \n')
 
        sys.exit()
 
    if args.format == FORMAT_PRETTY:
 
        print 'Calling method %s => %s' % (method, apihost)
 

	
 
    json_resp = api_call(apikey, apihost, method, **margs)
 
    error_prefix = ''
 
    if json_resp['error']:
 
        error_prefix = 'ERROR:'
 
        json_data = json_resp['error']
 
    else:
 
        json_data = json_resp['result']
 
    if args.format == FORMAT_JSON:
 
        print json.dumps(json_data)
 
    elif args.format == FORMAT_PRETTY:
 
        print 'Server response \n%s%s' % (
 
            error_prefix, json.dumps(json_data, indent=4, sort_keys=True)
 
        )
 
    return 0
 

	
 
if __name__ == '__main__':
 
    sys.exit(main(sys.argv))
kallithea/bin/kallithea_config.py
Show inline comments
 
#!/usr/bin/env python2
 

	
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_config
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
configuration generator for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 18, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
from __future__ import with_statement
 
import os
 
import sys
 
import uuid
 
import argparse
 
from mako.template import Template
 
TMPL = 'template.ini.mako'
 
here = os.path.dirname(os.path.abspath(__file__))
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-config [-h] [--filename=FILENAME] [--template=TEMPLATE] \n"
 
      "VARS optional specify extra template variable that will be available in "
 
      "template. Use comma separated key=val format eg.\n"
 
      "key1=val1,port=5000,host=127.0.0.1,elements='a\,b\,c'\n"
 
    )
 

	
 
    parser = argparse.ArgumentParser(
 
        description='Kallithea CONFIG generator with variable replacement',
 
        usage=usage
 
    )
 

	
 
    ## config
 
    group = parser.add_argument_group('CONFIG')
 
    group.add_argument('--filename', help='Output ini filename.')
 
    group.add_argument('--template', help='Mako template file to use instead of '
 
                                          'the default builtin template')
 
    group.add_argument('--raw', help='Store given mako template as raw without '
 
                                     'parsing. Use this to create custom template '
 
                                     'initially', action='store_true')
 
    group.add_argument('--show-defaults', help='Show all default variables for '
 
                                               'builtin template', action='store_true')
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def _escape_split(text, sep):
 
    """
 
    Allows for escaping of the separator: e.g. arg='foo\, bar'
 

	
 
    It should be noted that the way bash et. al. do command line parsing, those
 
    single quotes are required. a shameless ripoff from fabric project.
 

	
 
    """
 
    escaped_sep = r'\%s' % sep
 

	
 
    if escaped_sep not in text:
 
        return text.split(sep)
 

	
 
    before, _, after = text.partition(escaped_sep)
 
    startlist = before.split(sep)  # a regular split is fine here
 
    unfinished = startlist[-1]
 
    startlist = startlist[:-1]
 

	
 
    # recurse because there may be more escaped separators
 
    endlist = _escape_split(after, sep)
 

	
 
    # finish building the escaped value. we use endlist[0] becaue the first
 
    # part of the string sent in recursion is the rest of the escaped value.
 
    unfinished += sep + endlist[0]
 

	
 
    return startlist + [unfinished] + endlist[1:]  # put together all the parts
 

	
 
def _run(argv):
 
    parser, args, other = argparser(argv)
 
    if not len(sys.argv) > 1:
 
        print parser.print_help()
 
        sys.exit(0)
 
    # defaults that can be overwritten by arguments
 
    tmpl_stored_args = {
 
        'http_server': 'waitress',
 
        'lang': 'en',
 
        'database_engine': 'sqlite',
 
        'host': '127.0.0.1',
 
        'port': 5000,
 
        'error_aggregation_service': None,
 
    }
 
    if other:
 
        # parse arguments, we assume only first is correct
 
        kwargs = {}
 
        for el in _escape_split(other[0], ','):
 
            kv = _escape_split(el, '=')
 
            if len(kv) == 2:
 
                k, v = kv
 
                kwargs[k] = v
 
        # update our template stored args
 
        tmpl_stored_args.update(kwargs)
 

	
 
    # use default that cannot be replaced
 
    tmpl_stored_args.update({
 
        'uuid': lambda: uuid.uuid4().hex,
 
        'here': os.path.abspath(os.curdir),
 
    })
 
    if args.show_defaults:
 
        for k,v in tmpl_stored_args.iteritems():
 
            print '%s=%s' % (k, v)
 
        sys.exit(0)
 
    try:
 
        # built in template
 
        tmpl_file = os.path.join(here, TMPL)
 
        if args.template:
 
            tmpl_file = args.template
 

	
 
        with open(tmpl_file, 'rb') as f:
 
            tmpl_data = f.read().decode('utf-8')
 
            if args.raw:
 
                tmpl = tmpl_data
 
            else:
 
                tmpl = Template(tmpl_data).render(**tmpl_stored_args)
 
        with open(args.filename, 'wb') as f:
 
            f.write(tmpl.encode('utf-8'))
 
        print 'Wrote new config file in %s' % (os.path.abspath(args.filename))
 

	
 
    except Exception:
 
        from mako import exceptions
 
        print exceptions.text_error_template().render()
 

	
 
def main(argv=None):
 
    """
 
    Main execution function for cli
 

	
 
    :param argv:
 
    """
 
    if argv is None:
 
        argv = sys.argv
 

	
 
    return _run(argv)
 

	
 

	
 
if __name__ == '__main__':
 
    sys.exit(main(sys.argv))
kallithea/bin/kallithea_gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_gist
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Gist CLI client for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import sys
 
import stat
 
import argparse
 
import fileinput
 

	
 
from kallithea.bin.base import json, api_call, RcConf, FORMAT_JSON, FORMAT_PRETTY
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-gist [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] [GIST OPTIONS] "
 
      "[filename or stdin use - for terminal stdin ]\n"
 
      "Create config file: kallithea-gist --apikey=<key> --apihost=http://your.kallithea.server --save-config"
 
    )
 

	
 
    parser = argparse.ArgumentParser(description='Kallithea Gist cli',
 
                                     usage=usage)
 

	
 
    ## config
 
    group = parser.add_argument_group('config')
 
    group.add_argument('--apikey', help='api access key')
 
    group.add_argument('--apihost', help='api host')
 
    group.add_argument('--config', help='config file path DEFAULT: ~/.config/kallithea')
 
    group.add_argument('--save-config', action='store_true',
 
                       help='save the given config into a file')
 

	
 
    group = parser.add_argument_group('GIST')
 
    group.add_argument('-p', '--private', action='store_true',
 
                       help='create private Gist')
 
    group.add_argument('-f', '--filename',
 
                       help='set uploaded gist filename, '
 
                            'also defines syntax highlighting')
 
    group.add_argument('-d', '--description', help='Gist description')
 
    group.add_argument('-l', '--lifetime', metavar='MINUTES',
 
                       help='gist lifetime in minutes, -1 (DEFAULT) is forever')
 
    group.add_argument('--format', dest='format', type=str,
 
                       help='output format DEFAULT: `%s` can '
 
                       'be also `%s`' % (FORMAT_PRETTY, FORMAT_JSON),
 
            default=FORMAT_PRETTY
 
    )
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def _run(argv):
 
    conf = None
 
    parser, args, other = argparser(argv)
 

	
 
    api_credentials_given = (args.apikey and args.apihost)
 
    if args.save_config:
 
        if not api_credentials_given:
 
            raise parser.error('--save-config requires --apikey and --apihost')
 
        conf = RcConf(config_location=args.config,
 
                      autocreate=True, config={'apikey': args.apikey,
 
                                               'apihost': args.apihost})
 
        sys.exit()
 

	
 
    if not conf:
 
        conf = RcConf(config_location=args.config, autoload=True)
 
        if not conf:
 
            if not api_credentials_given:
 
                parser.error('Could not find config file and missing '
 
                             '--apikey or --apihost in params')
 

	
 
    apikey = args.apikey or conf['apikey']
 
    host = args.apihost or conf['apihost']
 
    DEFAULT_FILENAME = 'gistfile1.txt'
 
    if other:
 
        # skip multifiles for now
 
        filename = other[0]
 
        if filename == '-':
 
            filename = DEFAULT_FILENAME
 
            gist_content = ''
 
            for line in fileinput.input('-'):
 
                gist_content += line
 
        else:
 
            with open(filename, 'rb') as f:
 
                gist_content = f.read()
 

	
 
    else:
 
        filename = DEFAULT_FILENAME
 
        gist_content = None
 
        # little bit hacky but cross platform check where the
 
        # stdin comes from we skip the terminal case it can be handled by '-'
 
        mode = os.fstat(0).st_mode
 
        if stat.S_ISFIFO(mode):
 
            # "stdin is piped"
 
            gist_content = sys.stdin.read()
 
        elif stat.S_ISREG(mode):
 
            # "stdin is redirected"
 
            gist_content = sys.stdin.read()
 
        else:
 
            # "stdin is terminal"
 
            pass
 

	
 
    # make sure we don't upload binary stuff
 
    if gist_content and '\0' in gist_content:
 
        raise Exception('Error: binary files upload is not possible')
 

	
 
    filename = os.path.basename(args.filename or filename)
 
    if gist_content:
 
        files = {
 
            filename: {
 
                'content': gist_content,
 
                'lexer': None
 
            }
 
        }
 

	
 
        margs = dict(
 
            lifetime=args.lifetime,
 
            description=args.description,
 
            gist_type='private' if args.private else 'public',
 
            files=files
 
        )
 

	
 
        json_data = api_call(apikey, host, 'create_gist', **margs)['result']
 
        if args.format == FORMAT_JSON:
 
            print json.dumps(json_data)
 
        elif args.format == FORMAT_PRETTY:
 
            print json_data
 
            print 'Created %s gist %s' % (json_data['gist']['type'],
 
                                          json_data['gist']['url'])
 
    return 0
 

	
 

	
 
def main(argv=None):
 
    """
 
    Main execution function for cli
 

	
 
    :param argv:
 
    """
 
    if argv is None:
 
        argv = sys.argv
 

	
 
    try:
 
        return _run(argv)
 
    except Exception as e:
 
        print e
 
        return 1
 

	
 

	
 
if __name__ == '__main__':
 
    sys.exit(main(sys.argv))
kallithea/config/routing.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Routes configuration
 

	
 
The more specific and detailed routes should be defined first so they
 
may take precedent over the more generic routes. For more information
 
refer to the routes manual at http://routes.groovie.org/docs/
 
"""
 

	
 
from __future__ import with_statement
 
from routes import Mapper
 

	
 
# prefix for non repository related links needs to be prefixed with `/`
 
ADMIN_PREFIX = '/_admin'
 

	
 

	
 
def make_map(config):
 
    """Create, configure and return the routes Mapper"""
 
    rmap = Mapper(directory=config['pylons.paths']['controllers'],
 
                  always_scan=config['debug'])
 
    rmap.minimization = False
 
    rmap.explicit = False
 

	
 
    from kallithea.lib.utils import (is_valid_repo, is_valid_repo_group,
 
                                     get_repo_by_id)
 

	
 
    def check_repo(environ, match_dict):
 
        """
 
        check for valid repository for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_name = match_dict.get('repo_name')
 

	
 
        if match_dict.get('f_path'):
 
            #fix for multiple initial slashes that causes errors
 
            match_dict['f_path'] = match_dict['f_path'].lstrip('/')
 

	
 
        by_id_match = get_repo_by_id(repo_name)
 
        if by_id_match:
 
            repo_name = by_id_match
 
            match_dict['repo_name'] = repo_name
 

	
 
        return is_valid_repo(repo_name, config['base_path'])
 

	
 
    def check_group(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_group_name = match_dict.get('group_name')
 
        return is_valid_repo_group(repo_group_name, config['base_path'])
 

	
 
    def check_group_skip_path(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling, but skips
 
        verification of existing path
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_group_name = match_dict.get('group_name')
 
        return is_valid_repo_group(repo_group_name, config['base_path'],
 
                                   skip_path_check=True)
 

	
 
    def check_user_group(environ, match_dict):
 
        """
 
        check for valid user group for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        return True
 

	
 
    def check_int(environ, match_dict):
 
        return match_dict.get('id').isdigit()
 

	
 
    # The ErrorController route (handles 404/500 error pages); it should
 
    # likely stay at the top, ensuring it can always be resolved
 
    rmap.connect('/error/{action}', controller='error')
 
    rmap.connect('/error/{action}/{id}', controller='error')
 

	
 
    #==========================================================================
 
    # CUSTOM ROUTES HERE
 
    #==========================================================================
 

	
 
    #MAIN PAGE
 
    rmap.connect('home', '/', controller='home', action='index')
 
    rmap.connect('about', '/about', controller='home', action='about')
 
    rmap.connect('repo_switcher_data', '/_repos', controller='home',
 
                 action='repo_switcher_data')
 

	
 
    rmap.connect('rst_help',
 
                 "http://docutils.sourceforge.net/docs/user/rst/quickref.html",
 
                 _static=True)
 
    rmap.connect('kallithea_project_url', "https://kallithea-scm.org/", _static=True)
 
    rmap.connect('issues_url', 'https://bitbucket.org/conservancy/kallithea/issues', _static=True)
 

	
 
    #ADMIN REPOSITORY ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/repos') as m:
 
        m.connect("repos", "/repos",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("repos", "/repos",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_repo", "/create_repository",
 
                  action="create_repository", conditions=dict(method=["GET"]))
 
        m.connect("put_repo", "/repos/{repo_name:.*?}",
 
                  action="update", conditions=dict(method=["PUT"],
 
                  function=check_repo))
 
        m.connect("delete_repo", "/repos/{repo_name:.*?}",
 
                  action="delete", conditions=dict(method=["DELETE"],
 
                  ))
 

	
 
    #ADMIN REPOSITORY GROUPS ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/repo_groups') as m:
 
        m.connect("repos_groups", "/repo_groups",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("repos_groups", "/repo_groups",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_repos_group", "/repo_groups/new",
 
                  action="new", conditions=dict(method=["GET"]))
 
        m.connect("update_repos_group", "/repo_groups/{group_name:.*?}",
 
                  action="update", conditions=dict(method=["PUT"],
 
                                                   function=check_group))
 

	
 
        m.connect("repos_group", "/repo_groups/{group_name:.*?}",
 
                  action="show", conditions=dict(method=["GET"],
 
                                                 function=check_group))
 

	
 
        #EXTRAS REPO GROUP ROUTES
 
        m.connect("edit_repo_group", "/repo_groups/{group_name:.*?}/edit",
 
                  action="edit",
 
                  conditions=dict(method=["GET"], function=check_group))
 
        m.connect("edit_repo_group", "/repo_groups/{group_name:.*?}/edit",
 
                  action="edit",
 
                  conditions=dict(method=["PUT"], function=check_group))
 

	
 
        m.connect("edit_repo_group_advanced", "/repo_groups/{group_name:.*?}/edit/advanced",
 
                  action="edit_repo_group_advanced",
 
                  conditions=dict(method=["GET"], function=check_group))
 
        m.connect("edit_repo_group_advanced", "/repo_groups/{group_name:.*?}/edit/advanced",
 
                  action="edit_repo_group_advanced",
 
                  conditions=dict(method=["PUT"], function=check_group))
 

	
 
        m.connect("edit_repo_group_perms", "/repo_groups/{group_name:.*?}/edit/permissions",
 
                  action="edit_repo_group_perms",
 
                  conditions=dict(method=["GET"], function=check_group))
 
        m.connect("edit_repo_group_perms", "/repo_groups/{group_name:.*?}/edit/permissions",
 
                  action="update_perms",
 
                  conditions=dict(method=["PUT"], function=check_group))
 
        m.connect("edit_repo_group_perms", "/repo_groups/{group_name:.*?}/edit/permissions",
 
                  action="delete_perms",
 
                  conditions=dict(method=["DELETE"], function=check_group))
 

	
 
        m.connect("delete_repo_group", "/repo_groups/{group_name:.*?}",
 
                  action="delete", conditions=dict(method=["DELETE"],
 
                                                   function=check_group_skip_path))
 

	
 

	
 
    #ADMIN USER ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/users') as m:
 
        m.connect("users", "/users",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("users", "/users",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("formatted_users", "/users.{format}",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_user", "/users/new",
 
                  action="new", conditions=dict(method=["GET"]))
 
        m.connect("update_user", "/users/{id}",
 
                  action="update", conditions=dict(method=["PUT"]))
 
        m.connect("delete_user", "/users/{id}",
 
                  action="delete", conditions=dict(method=["DELETE"]))
 
        m.connect("edit_user", "/users/{id}/edit",
 
                  action="edit", conditions=dict(method=["GET"]))
 
        m.connect("user", "/users/{id}",
 
                  action="show", conditions=dict(method=["GET"]))
 

	
 
        #EXTRAS USER ROUTES
 
        m.connect("edit_user_advanced", "/users/{id}/edit/advanced",
 
                  action="edit_advanced", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_advanced", "/users/{id}/edit/advanced",
 
                  action="update_advanced", conditions=dict(method=["PUT"]))
 

	
 
        m.connect("edit_user_api_keys", "/users/{id}/edit/api_keys",
 
                  action="edit_api_keys", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_api_keys", "/users/{id}/edit/api_keys",
 
                  action="add_api_key", conditions=dict(method=["POST"]))
 
        m.connect("edit_user_api_keys", "/users/{id}/edit/api_keys",
 
                  action="delete_api_key", conditions=dict(method=["DELETE"]))
 

	
 
        m.connect("edit_user_perms", "/users/{id}/edit/permissions",
 
                  action="edit_perms", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_perms", "/users/{id}/edit/permissions",
 
                  action="update_perms", conditions=dict(method=["PUT"]))
 

	
kallithea/controllers/files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.files
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Files controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import logging
 
import traceback
 
import tempfile
 
import shutil
 

	
 
from pylons import request, response, tmpl_context as c, url
 
from pylons.i18n.translation import _
 
from pylons.controllers.util import redirect
 
from kallithea.lib.utils import jsonify, action_logger
 

	
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 

	
 
from kallithea.lib.compat import OrderedDict
 
from kallithea.lib.utils2 import convert_line_endings, detect_mode, safe_str,\
 
    str2bool
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import RepositoryError, \
 
    ChangesetDoesNotExistError, EmptyRepositoryError, \
 
    ImproperArchiveTypeError, VCSError, NodeAlreadyExistsError,\
 
    NodeDoesNotExistError, ChangesetError, NodeError
 
from kallithea.lib.vcs.nodes import FileNode
 

	
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.db import Repository
 

	
 
from kallithea.controllers.changeset import anchor_url, _ignorews_url,\
 
    _context_url, get_line_ctx, get_ignore_ws
 
from webob.exc import HTTPNotFound
 
from kallithea.lib.exceptions import NonRelativePathError
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FilesController(BaseRepoController):
 

	
 
    def __before__(self):
 
        super(FilesController, self).__before__()
 
        c.cut_off_limit = self.cut_off_limit
 

	
 
    def __get_cs(self, rev, silent_empty=False):
 
        """
 
        Safe way to get changeset if error occur it redirects to tip with
 
        proper message
 

	
 
        :param rev: revision to fetch
 
        :silent_empty: return None if repository is empty
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            if silent_empty:
 
                return None
 
            url_ = url('files_add_home',
 
                       repo_name=c.repo_name,
 
                       revision=0, f_path='', anchor='edit')
 
            add_new = h.link_to(_('Click here to add new file'), url_, class_="alert-link")
 
            h.flash(h.literal(_('There are no files yet. %s') % add_new),
 
                    category='warning')
 
            raise HTTPNotFound()
 
        except(ChangesetDoesNotExistError, LookupError), e:
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(safe_str(e), category='error')
 
            raise HTTPNotFound()
 

	
 
    def __get_filenode(self, cs, path):
 
        """
 
        Returns file_node or raise HTTP error.
 

	
 
        :param cs: given changeset
 
        :param path: path to lookup
 
        """
 

	
 
        try:
 
            file_node = cs.get_node(path)
 
            if file_node.is_dir():
 
                raise RepositoryError('given path is a directory')
 
        except(ChangesetDoesNotExistError,), e:
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(safe_str(e), category='error')
 
            raise HTTPNotFound()
 

	
 
        return file_node
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')
 
    def index(self, repo_name, revision, f_path, annotate=False):
 
        # redirect to given revision from form if given
 
        post_revision = request.POST.get('at_rev', None)
 
        if post_revision:
 
            cs = self.__get_cs(post_revision) # FIXME - unused!
 

	
 
        c.revision = revision
 
        c.changeset = self.__get_cs(revision)
 
        c.branch = request.GET.get('branch', None)
 
        c.f_path = f_path
 
        c.annotate = annotate
 
        cur_rev = c.changeset.revision
 

	
 
        # prev link
 
        try:
 
            prev_rev = c.db_repo_scm_instance.get_changeset(cur_rev).prev(c.branch)
 
            c.url_prev = url('files_home', repo_name=c.repo_name,
 
                         revision=prev_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_prev += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_prev = '#'
 

	
 
        # next link
 
        try:
 
            next_rev = c.db_repo_scm_instance.get_changeset(cur_rev).next(c.branch)
 
            c.url_next = url('files_home', repo_name=c.repo_name,
 
                     revision=next_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_next += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_next = '#'
 

	
 
        # files or dirs
 
        try:
 
            c.file = c.changeset.get_node(f_path)
 

	
 
            if c.file.is_file():
 
                c.load_full_history = False
 
                file_last_cs = c.file.last_changeset
 
                c.file_changeset = (c.changeset
 
                                    if c.changeset.revision < file_last_cs.revision
 
                                    else file_last_cs)
 
                #determine if we're on branch head
 
                _branches = c.db_repo_scm_instance.branches
 
                c.on_branch_head = revision in _branches.keys() + _branches.values()
 
                _hist = []
 
                c.file_history = []
 
                if c.load_full_history:
 
                    c.file_history, _hist = self._get_node_history(c.changeset, f_path)
 

	
 
                c.authors = []
 
                for a in set([x.author for x in _hist]):
 
                    c.authors.append((h.email(a), h.person(a)))
 
            else:
 
                c.authors = c.file_history = []
 
        except RepositoryError as e:
 
            h.flash(safe_str(e), category='error')
 
            raise HTTPNotFound()
 

	
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            return render('files/files_ypjax.html')
 

	
 
        # TODO: tags and bookmarks?
 
        c.revision_options = [(c.changeset.raw_id,
 
                              _('%s at %s') % (c.changeset.branch, h.short_id(c.changeset.raw_id)))] + \
 
            [(n, b) for b, n in c.db_repo_scm_instance.branches.items()]
 
        if c.db_repo_scm_instance.closed_branches:
 
            prefix = _('(closed)') + ' '
 
            c.revision_options += [('-', '-')] + \
 
                [(n, prefix + b) for b, n in c.db_repo_scm_instance.closed_branches.items()]
 

	
 
        return render('files/files.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')
 
    @jsonify
 
    def history(self, repo_name, revision, f_path):
 
        changeset = self.__get_cs(revision)
 
        f_path = f_path
 
        _file = changeset.get_node(f_path)
 
        if _file.is_file():
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 

	
 
            res = []
 
            for obj in file_history:
 
                res.append({
 
                    'text': obj[1],
 
                    'children': [{'id': o[0], 'text': o[1]} for o in obj[0]]
 
                })
 

	
kallithea/lib/auth.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth
 
~~~~~~~~~~~~~~~~~~
 

	
 
authentication and permission libraries
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
from __future__ import with_statement
 
import time
 
import os
 
import logging
 
import traceback
 
import hashlib
 
import itertools
 
import collections
 

	
 
from decorator import decorator
 

	
 
from pylons import url, request
 
from pylons.controllers.util import abort, redirect
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib import secure_form
 
from sqlalchemy import or_
 
from sqlalchemy.orm.exc import ObjectDeletedError
 
from sqlalchemy.orm import joinedload
 

	
 
from kallithea import __platform__, is_windows, is_unix
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model import meta
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.model.db import User, Repository, Permission, \
 
    UserToPerm, UserGroupRepoToPerm, UserGroupToPerm, UserGroupMember, \
 
    RepoGroup, UserGroupRepoGroupToPerm, UserIpMap, UserGroupUserGroupToPerm, \
 
    UserGroup, UserApiKeys
 

	
 
from kallithea.lib.utils2 import safe_unicode, aslist
 
from kallithea.lib.utils import get_repo_slug, get_repo_group_slug, \
 
    get_user_group_slug, conditional_cache
 
from kallithea.lib.caching_query import FromCache
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """
 
    This is a simple class for generating password from different sets of
 
    characters
 
    usage::
 

	
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
        + ALPHABETS_NUM + ALPHABETS_SPECIAL
 
    ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
 
    ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
 
    ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
 
    ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
 

	
 
    def gen_password(self, length, alphabet=ALPHABETS_FULL):
 
        assert len(alphabet) <= 256, alphabet
 
        l = []
 
        while len(l) < length:
 
            i = ord(os.urandom(1))
 
            if i < len(alphabet):
 
                l.append(alphabet[i])
 
        return ''.join(l)
 

	
 

	
 
class KallitheaCrypto(object):
 

	
 
    @classmethod
 
    def hash_string(cls, str_):
 
        """
 
        Cryptographic function used for password hashing based on pybcrypt
 
        or Python's own OpenSSL wrapper on windows
 

	
 
        :param password: password to hash
 
        """
 
        if is_windows:
 
            return hashlib.sha256(str_).hexdigest()
 
        elif is_unix:
 
            import bcrypt
 
            return bcrypt.hashpw(str_, bcrypt.gensalt(10))
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 

	
 
    @classmethod
 
    def hash_check(cls, password, hashed):
 
        """
 
        Checks matching password with it's hashed value, runs different
 
        implementation based on platform it runs on
 

	
 
        :param password: password
 
        :param hashed: password in hashed form
 
        """
 

	
 
        if is_windows:
 
            return hashlib.sha256(password).hexdigest() == hashed
 
        elif is_unix:
 
            import bcrypt
 
            return bcrypt.hashpw(password, hashed) == hashed
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 

	
 

	
 
def get_crypt_password(password):
 
    return KallitheaCrypto.hash_string(password)
 

	
 

	
 
def check_password(password, hashed):
 
    return KallitheaCrypto.hash_check(password, hashed)
 

	
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin, user_inherit_default_permissions,
 
                       explicit, algo):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 

	
 
    def _choose_perm(new_perm, cur_perm):
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if algo == 'higherwin':
 
            if new_perm_val > cur_perm_val:
 
                return new_perm
 
            return cur_perm
 
        elif algo == 'lowerwin':
 
            if new_perm_val < cur_perm_val:
 
                return new_perm
 
            return cur_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user = User.get_by_username('default', cache=True)
 
    default_user_id = default_user.user_id
 

	
 
    default_repo_perms = Permission.get_default_perms(default_user_id)
 
    default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 
    default_user_group_perms = Permission.get_default_user_group_perms(default_user_id)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        permissions[GLOBAL].add('hg.create.write_on_repogroup.true')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query()\
 
        .filter(UserToPerm.user_id == default_user_id)\
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        if perm.Repository.private and not (perm.Repository.user_id == user_id):
 
            # disable defaults for private repos,
kallithea/lib/indexers/daemon.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.indexers.daemon
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
A daemon will read from task table and run tasks
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 26, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 
import traceback
 

	
 
from shutil import rmtree
 
from time import mktime
 

	
 
from os.path import dirname as dn
 
from os.path import join as jn
 

	
 
# Add location of top level folder to sys.path
 
project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
 
sys.path.append(project_path)
 

	
 
from kallithea.config.conf import INDEX_EXTENSIONS
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.db import Repository
 
from kallithea.lib.utils2 import safe_unicode, safe_str
 
from kallithea.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
 
    CHGSET_IDX_NAME
 

	
 
from kallithea.lib.vcs.exceptions import ChangesetError, RepositoryError, \
 
    NodeDoesNotExistError
 

	
 
from whoosh.index import create_in, open_dir, exists_in
 
from whoosh.query import *
 
from whoosh.qparser import QueryParser
 

	
 
log = logging.getLogger('whoosh_indexer')
 

	
 

	
 
class WhooshIndexingDaemon(object):
 
    """
 
    Daemon for atomic indexing jobs
 
    """
 

	
 
    def __init__(self, indexname=IDX_NAME, index_location=None,
 
                 repo_location=None, sa=None, repo_list=None,
 
                 repo_update_list=None):
 
        self.indexname = indexname
 

	
 
        self.index_location = index_location
 
        if not index_location:
 
            raise Exception('You have to provide index location')
 

	
 
        self.repo_location = repo_location
 
        if not repo_location:
 
            raise Exception('You have to provide repositories location')
 

	
 
        self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
 

	
 
        #filter repo list
 
        if repo_list:
 
            #Fix non-ascii repo names to unicode
 
            repo_list = map(safe_unicode, repo_list)
 
            self.filtered_repo_paths = {}
 
            for repo_name, repo in self.repo_paths.items():
 
                if repo_name in repo_list:
 
                    self.filtered_repo_paths[repo_name] = repo
 

	
 
            self.repo_paths = self.filtered_repo_paths
 

	
 
        #filter update repo list
 
        self.filtered_repo_update_paths = {}
 
        if repo_update_list:
 
            self.filtered_repo_update_paths = {}
 
            for repo_name, repo in self.repo_paths.items():
 
                if repo_name in repo_update_list:
 
                    self.filtered_repo_update_paths[repo_name] = repo
 
            self.repo_paths = self.filtered_repo_update_paths
 

	
 
        self.initial = True
 
        if not os.path.isdir(self.index_location):
 
            os.makedirs(self.index_location)
 
            log.info('Cannot run incremental index since it does not '
 
                     'yet exist running full build')
 
        elif not exists_in(self.index_location, IDX_NAME):
 
            log.info('Running full index build as the file content '
 
                     'index does not exist')
 
        elif not exists_in(self.index_location, CHGSET_IDX_NAME):
 
            log.info('Running full index build as the changeset '
 
                     'index does not exist')
 
        else:
 
            self.initial = False
 

	
 
    def _get_index_revision(self, repo):
 
        db_repo = Repository.get_by_repo_name(repo.name_unicode)
 
        landing_rev = 'tip'
 
        if db_repo:
 
            _rev_type, _rev = db_repo.landing_rev
 
            landing_rev = _rev
 
        return landing_rev
 

	
 
    def _get_index_changeset(self, repo, index_rev=None):
 
        if not index_rev:
 
            index_rev = self._get_index_revision(repo)
 
        cs = repo.get_changeset(index_rev)
 
        return cs
 

	
 
    def get_paths(self, repo):
 
        """
 
        recursive walk in root dir and return a set of all path in that dir
 
        based on repository walk function
 
        """
 
        index_paths_ = set()
 
        try:
 
            cs = self._get_index_changeset(repo)
 
            for _topnode, _dirs, files in cs.walk('/'):
 
                for f in files:
 
                    index_paths_.add(jn(safe_str(repo.path), safe_str(f.path)))
 

	
 
        except RepositoryError:
 
            log.debug(traceback.format_exc())
 
            pass
 
        return index_paths_
 

	
 
    def get_node(self, repo, path, index_rev=None):
 
        """
 
        gets a filenode based on given full path. It operates on string for
 
        hg git compatibility.
 

	
 
        :param repo: scm repo instance
 
        :param path: full path including root location
 
        :return: FileNode
 
        """
 
        # FIXME: paths should be normalized ... or even better: don't include repo.path
 
        path = safe_str(path)
 
        repo_path = safe_str(repo.path)
 
        assert path.startswith(repo_path)
 
        assert path[len(repo_path)] in (os.path.sep, os.path.altsep)
 
        node_path = path[len(repo_path) + 1:]
 
        cs = self._get_index_changeset(repo, index_rev=index_rev)
 
        node = cs.get_node(node_path)
 
        return node
 

	
 
    def get_node_mtime(self, node):
 
        return mktime(node.last_changeset.date.timetuple())
 

	
 
    def add_doc(self, writer, path, repo, repo_name, index_rev=None):
 
        """
 
        Adding doc to writer this function itself fetches data from
 
        the instance of vcs backend
 
        """
 

	
 
        node = self.get_node(repo, path, index_rev)
 
        indexed = indexed_w_content = 0
 
        # we just index the content of chosen files, and skip binary files
 
        if node.extension in INDEX_EXTENSIONS and not node.is_binary:
 
            u_content = node.content
 
            if not isinstance(u_content, unicode):
 
                log.warning('  >> %s Could not get this content as unicode '
 
                            'replacing with empty content' % path)
 
                u_content = u''
 
            else:
 
                log.debug('    >> %s [WITH CONTENT]', path)
 
                indexed_w_content += 1
 

	
 
        else:
 
            log.debug('    >> %s', path)
 
            # just index file name without it's content
 
            u_content = u''
 
            indexed += 1
 

	
 
        p = safe_unicode(path)
 
        writer.add_document(
 
            fileid=p,
 
            owner=unicode(repo.contact),
 
            repository=safe_unicode(repo_name),
 
            path=p,
 
            content=u_content,
 
            modtime=self.get_node_mtime(node),
 
            extension=node.extension
 
        )
 
        return indexed, indexed_w_content
 

	
 
    def index_changesets(self, writer, repo_name, repo, start_rev=None):
 
        """
 
        Add all changeset in the vcs repo starting at start_rev
 
        to the index writer
 

	
 
        :param writer: the whoosh index writer to add to
 
        :param repo_name: name of the repository from whence the
 
          changeset originates including the repository group
 
        :param repo: the vcs repository instance to index changesets for,
 
          the presumption is the repo has changesets to index
 
        :param start_rev=None: the full sha id to start indexing from
 
          if start_rev is None then index from the first changeset in
 
          the repo
 
        """
 

	
kallithea/lib/paster_commands/cache_keys.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.cache_keys
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
cleanup-keys paster command for Kallithea
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: mar 27, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from kallithea.model.meta import Session
 
from kallithea.lib.utils import BasePasterCommand
 
from kallithea.model.db import CacheInvalidation
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Cache keys utils"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 
        _caches = CacheInvalidation.query().order_by(CacheInvalidation.cache_key).all()
 
        if self.options.show:
 
            for c_obj in _caches:
 
                print 'key:%s active:%s' % (c_obj.cache_key, c_obj.cache_active)
 
        elif self.options.cleanup:
 
            for c_obj in _caches:
 
                Session().delete(c_obj)
 
                print 'removing key:%s' % (c_obj.cache_key)
 
                Session().commit()
 
        else:
 
            print 'nothing done exiting...'
 
        sys.exit(0)
 

	
 
    def update_parser(self):
 
        self.parser.add_option(
 
            '--show',
 
            action='store_true',
 
            dest='show',
 
            help=("show existing cache keys with together with status")
 
        )
 

	
 
        self.parser.add_option(
 
            '--cleanup',
 
            action="store_true",
 
            dest="cleanup",
 
            help="cleanup existing cache keys"
 
        )
kallithea/lib/paster_commands/cleanup.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.cleanup
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
cleanup-repos paster command for Kallithea
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import re
 
import shutil
 
import logging
 
import datetime
 

	
 
from kallithea.lib.utils import BasePasterCommand, ask_ok, REMOVED_REPO_PAT
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import Ui
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Cleanup deleted repos"
 

	
 
    def _parse_older_than(self, val):
 
        regex = re.compile(r'((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)m)?((?P<seconds>\d+?)s)?')
 
        parts = regex.match(val)
 
        if not parts:
 
            return
 
        parts = parts.groupdict()
 
        time_params = {}
 
        for (name, param) in parts.iteritems():
 
            if param:
 
                time_params[name] = int(param)
 
        return datetime.timedelta(**time_params)
 

	
 
    def _extract_date(self, name):
 
        """
 
        Extract the date part from rm__<date> pattern of removed repos,
 
        and convert it to datetime object
 

	
 
        :param name:
 
        """
 
        date_part = name[4:19]  # 4:19 since we don't parse milisecods
 
        return datetime.datetime.strptime(date_part, '%Y%m%d_%H%M%S')
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 

	
 
        repos_location = Ui.get_repos_location()
 
        to_remove = []
 
        for dn_, dirs, f in os.walk(safe_str(repos_location)):
 
            alldirs = list(dirs)
 
            del dirs[:]
 
            if ('.hg' in alldirs or
 
                'objects' in alldirs and ('refs' in alldirs or 'packed-refs' in f)):
 
                continue
 
            for loc in alldirs:
 
                if REMOVED_REPO_PAT.match(loc):
 
                    to_remove.append([os.path.join(dn_, loc),
 
                                      self._extract_date(loc)])
 
                else:
 
                    dirs.append(loc)
 

	
 
        #filter older than (if present)!
 
        now = datetime.datetime.now()
 
        older_than = self.options.older_than
 
        if older_than:
 
            to_remove_filtered = []
 
            older_than_date = self._parse_older_than(older_than)
 
            for name, date_ in to_remove:
 
                repo_age = now - date_
 
                if repo_age > older_than_date:
 
                    to_remove_filtered.append([name, date_])
 

	
 
            to_remove = to_remove_filtered
 
            print >> sys.stdout, 'removing %s deleted repos older than %s (%s)' \
 
                % (len(to_remove), older_than, older_than_date)
 
        else:
 
            print >> sys.stdout, 'removing all [%s] deleted repos' \
 
                % len(to_remove)
 
        if self.options.dont_ask or not to_remove:
 
            # don't ask just remove !
 
            remove = True
 
        else:
 
            remove = ask_ok('the following repositories will be deleted completely:\n%s\n'
 
                            'are you sure you want to remove them [y/n]?'
 
                            % ', \n'.join(['%s removed on %s'
 
                    % (safe_str(x[0]), safe_str(x[1])) for x in to_remove]))
 

	
 
        if remove:
 
            for path, date_ in to_remove:
 
                print >> sys.stdout, 'removing repository %s' % path
 
                shutil.rmtree(path)
 
        else:
 
            print 'nothing done exiting...'
 
            sys.exit(0)
 

	
 
    def update_parser(self):
 
        self.parser.add_option(
 
            '--older-than',
 
            action='store',
 
            dest='older_than',
 
            help=("only remove repos that have been removed "
 
                 "at least given time ago. "
 
                 "The default is to remove all removed repositories. "
 
                 "Possible suffixes: "
 
                 "d (days), h (hours), m (minutes), s (seconds). "
 
                 "For example --older-than=30d deletes repositories "
 
                 "removed more than 30 days ago.")
 
            )
 

	
 
        self.parser.add_option(
 
            '--dont-ask',
 
            action="store_true",
 
            dest="dont_ask",
 
            help="remove repositories without asking for confirmation."
 
        )
kallithea/lib/paster_commands/ishell.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.ishell
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
interactive shell paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from kallithea.lib.utils import BasePasterCommand
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Interactive shell"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 

	
 
        # imports, used in ipython shell
 
        import os
 
        import sys
 
        import time
 
        import shutil
 
        import datetime
 
        from kallithea.model.db import *
 

	
 
        try:
 
            from IPython import embed
 
            from IPython.config.loader import Config
 
            cfg = Config()
 
            cfg.InteractiveShellEmbed.confirm_exit = False
 
            embed(config=cfg, banner1="Kallithea IShell.")
 
        except ImportError:
 
            print 'ipython installation required for ishell'
 
            sys.exit(-1)
 

	
 
    def update_parser(self):
 
        pass
kallithea/lib/paster_commands/make_index.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.make_index
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
make-index paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 17, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from string import strip
 
from kallithea.model.repo import RepoModel
 
from kallithea.lib.utils import BasePasterCommand, load_rcextensions
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Creates or updates full text search index"
 

	
 
    def command(self):
 
        logging.config.fileConfig(self.path_to_ini_file)
 
        #get SqlAlchemy session
 
        self._init_session()
 
        from pylons import config
 
        index_location = config['index_dir']
 
        load_rcextensions(config['here'])
 

	
 
        repo_location = self.options.repo_location \
 
            if self.options.repo_location else RepoModel().repos_path
 
        repo_list = map(strip, self.options.repo_list.split(',')) \
 
            if self.options.repo_list else None
 

	
 
        repo_update_list = map(strip, self.options.repo_update_list.split(',')) \
 
            if self.options.repo_update_list else None
 

	
 
        #======================================================================
 
        # WHOOSH DAEMON
 
        #======================================================================
 
        from kallithea.lib.pidlock import LockHeld, DaemonLock
 
        from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
        try:
 
            l = DaemonLock(file_=os.path.join(dn(dn(index_location)),
 
                                              'make_index.lock'))
 
            WhooshIndexingDaemon(index_location=index_location,
 
                                 repo_location=repo_location,
 
                                 repo_list=repo_list,
 
                                 repo_update_list=repo_update_list)\
 
                .run(full_index=self.options.full_index)
 
            l.release()
 
        except LockHeld:
 
            sys.exit(1)
 

	
 
    def update_parser(self):
 
        self.parser.add_option('--repo-location',
 
                          action='store',
 
                          dest='repo_location',
 
                          help="Specifies repositories location to index OPTIONAL",
 
                          )
 
        self.parser.add_option('--index-only',
 
                          action='store',
 
                          dest='repo_list',
 
                          help="Specifies a comma separated list of repositories "
 
                                "to build index on. If not given all repositories "
 
                                "are scanned for indexing. OPTIONAL",
 
                          )
 
        self.parser.add_option('--update-only',
 
                          action='store',
 
                          dest='repo_update_list',
 
                          help="Specifies a comma separated list of repositories "
 
                                "to re-build index on. OPTIONAL",
 
                          )
 
        self.parser.add_option('-f',
 
                          action='store_true',
 
                          dest='full_index',
 
                          help="Specifies that index should be made full i.e"
 
                                " destroy old and build from scratch",
 
                          default=False)
kallithea/lib/paster_commands/make_rcextensions.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.make_rcextensions
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
make-rcext paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import pkg_resources
 

	
 
from kallithea.lib.utils import BasePasterCommand, ask_ok
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Write template file for extending Kallithea in Python."
 
    usage = "CONFIG_FILE"
 
    description = '''\
 
        A rcextensions directory with a __init__.py file will be created next to
 
        the ini file. Local customizations in that file will survive upgrades.
 
        The file contains instructions on how it can be customized.
 
        '''
 

	
 
    def command(self):
 
        from pylons import config
 

	
 
        here = config['here']
 
        content = pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'rcextensions', '__init__.py')
 
        )
 
        ext_file = os.path.join(here, 'rcextensions', '__init__.py')
 
        if os.path.exists(ext_file):
 
            msg = ('Extension file already exists, do you want '
 
                   'to overwrite it ? [y/n]')
 
            if not ask_ok(msg):
 
                print 'Nothing done...'
 
                return
 

	
 
        dirname = os.path.dirname(ext_file)
 
        if not os.path.isdir(dirname):
 
            os.makedirs(dirname)
 
        with open(ext_file, 'wb') as f:
 
            f.write(content)
 
            print 'Wrote new extensions file to %s' % ext_file
 

	
 
    def update_parser(self):
 
        pass
kallithea/lib/paster_commands/repo_scan.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.repo_scan
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
repo-scan paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Feb 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 

	
 
from kallithea.model.scm import ScmModel
 
from kallithea.lib.utils import BasePasterCommand, repo2db_mapper
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Rescan default location for new repositories"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 
        rm_obsolete = self.options.delete_obsolete
 
        log.info('Now scanning root location for new repos...')
 
        added, removed = repo2db_mapper(ScmModel().repo_scan(),
 
                                        remove_obsolete=rm_obsolete)
 
        added = ', '.join(added) or '-'
 
        removed = ', '.join(removed) or '-'
 
        log.info('Scan completed added: %s removed: %s', added, removed)
 

	
 
    def update_parser(self):
 
        self.parser.add_option(
 
            '--delete-obsolete',
 
            action='store_true',
 
            help="Use this flag do delete repositories that are "
 
                 "present in Kallithea database but not on the filesystem",
 
        )
kallithea/lib/paster_commands/update_repoinfo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.update_repoinfo
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
update-repoinfo paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import logging
 
import string
 

	
 
from kallithea.lib.utils import BasePasterCommand
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname as dn
 
rc_path = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Updates repositories caches for last changeset"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 

	
 
        repo_update_list = map(string.strip,
 
                               self.options.repo_update_list.split(',')) \
 
                               if self.options.repo_update_list else None
 

	
 
        if repo_update_list:
 
            repo_list = list(Repository.query()\
 
                .filter(Repository.repo_name.in_(repo_update_list)))
 
        else:
 
            repo_list = Repository.getAll()
 
        RepoModel.update_repoinfo(repositories=repo_list)
 
        Session().commit()
 

	
 
        if self.options.invalidate_cache:
 
            for r in repo_list:
 
                r.set_invalidate()
 
        print 'Updated cache for %s repositories' % (len(repo_list))
 

	
 
    def update_parser(self):
 
        self.parser.add_option('--update-only',
 
                           action='store',
 
                           dest='repo_update_list',
 
                           help="Specifies a comma separated list of repositories "
 
                                "to update last commit info for. OPTIONAL")
 
        self.parser.add_option('--invalidate-cache',
 
                           action='store_true',
 
                           dest='invalidate_cache',
 
                           help="Trigger cache invalidation event for repos. "
 
                                "OPTIONAL")
kallithea/lib/pidlock.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from __future__ import with_statement
 
import os
 
import errno
 

	
 
from multiprocessing.util import Finalize
 

	
 
from kallithea.lib.compat import kill
 

	
 

	
 
class LockHeld(Exception):
 
    pass
 

	
 

	
 
class DaemonLock(object):
 
    """daemon locking
 
    USAGE:
 
    try:
 
        l = DaemonLock(file_='/path/tolockfile',desc='test lock')
 
        main()
 
        l.release()
 
    except LockHeld:
 
        sys.exit(1)
 
    """
 

	
 
    def __init__(self, file_=None, callbackfn=None,
 
                 desc='daemon lock', debug=False):
 

	
 
        lock_name = os.path.join(os.path.dirname(__file__), 'running.lock')
 
        self.pidfile = file_ if file_ else lock_name
 
        self.callbackfn = callbackfn
 
        self.desc = desc
 
        self.debug = debug
 
        self.held = False
 
        #run the lock automatically !
 
        self.lock()
 
        self._finalize = Finalize(self, DaemonLock._on_finalize,
 
                                  args=(self, debug), exitpriority=10)
 

	
 
    @staticmethod
 
    def _on_finalize(lock, debug):
 
        if lock.held:
 
            if debug:
 
                print 'lock held finalizing and running lock.release()'
 
            lock.release()
 

	
 
    def lock(self):
 
        """
 
        locking function, if lock is present it
 
        will raise LockHeld exception
 
        """
 
        lockname = str(os.getpid())
 
        if self.debug:
 
            print 'running lock'
 
        self.trylock()
 
        self.makelock(lockname, self.pidfile)
 
        return True
 

	
 
    def trylock(self):
 
        running_pid = False
 
        if self.debug:
 
            print 'checking for already running process'
 
        try:
 
            with open(self.pidfile, 'r') as f:
 
                try:
 
                    running_pid = int(f.readline())
 
                except ValueError:
 
                    running_pid = -1
 

	
 
            if self.debug:
 
                print ('lock file present running_pid: %s, '
 
                       'checking for execution' % (running_pid,))
 
            # Now we check the PID from lock file matches to the current
 
            # process PID
 
            if running_pid:
 
                try:
 
                    kill(running_pid, 0)
 
                except OSError as exc:
 
                    if exc.errno in (errno.ESRCH, errno.EPERM):
 
                        print ("Lock File is there but"
 
                               " the program is not running")
 
                        print "Removing lock file for the: %s" % running_pid
 
                        self.release()
 
                    else:
 
                        raise
 
                else:
 
                    print "You already have an instance of the program running"
 
                    print "It is running as process %s" % running_pid
 
                    raise LockHeld()
 

	
 
        except IOError as e:
 
            if e.errno != 2:
 
                raise
 

	
 
    def release(self):
 
        """releases the pid by removing the pidfile
 
        """
 
        if self.debug:
 
            print 'trying to release the pidlock'
 

	
 
        if self.callbackfn:
 
            #execute callback function on release
 
            if self.debug:
 
                print 'executing callback function %s' % self.callbackfn
 
            self.callbackfn()
 
        try:
 
            if self.debug:
 
                print 'removing pidfile %s' % self.pidfile
 
            os.remove(self.pidfile)
 
            self.held = False
 
        except OSError as e:
 
            if self.debug:
 
                print 'removing pidfile failed %s' % e
 
            pass
 

	
 
    def makelock(self, lockname, pidfile):
 
        """
 
        this function will make an actual lock
 

	
 
        :param lockname: actual pid of file
 
        :param pidfile: the file to write the pid in
 
        """
 
        if self.debug:
 
            print 'creating a file %s and pid: %s' % (pidfile, lockname)
 

	
 
        dir_, file_ = os.path.split(pidfile)
 
        if not os.path.isdir(dir_):
 
            os.makedirs(dir_)
 
        with open(self.pidfile, 'wb') as f:
 
            f.write(lockname)
 
        self.held = True
kallithea/lib/profiler.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import objgraph
 
import cProfile
 
import pstats
 
import cgi
 
import pprint
 
import threading
 

	
 
from StringIO import StringIO
 

	
 

	
 
class ProfilingMiddleware(object):
 
    def __init__(self, app):
 
        self.lock = threading.Lock()
 
        self.app = app
 

	
 
    def __call__(self, environ, start_response):
 
        with self.lock:
 
            profiler = cProfile.Profile()
 

	
 
            def run_app(*a, **kw):
 
                self.response = self.app(environ, start_response)
 

	
 
            profiler.runcall(run_app, environ, start_response)
 

	
 
            profiler.snapshot_stats()
 

	
 
            stats = pstats.Stats(profiler)
 
            stats.sort_stats('calls') #cumulative
 

	
 
            # Redirect output
 
            out = StringIO()
 
            stats.stream = out
 

	
 
            stats.print_stats()
 

	
 
            resp = ''.join(self.response)
 

	
 
            # Lets at least only put this on html-like responses.
 
            if resp.strip().startswith('<'):
 
                ## The profiling info is just appended to the response.
 
                ##  Browsers don't mind this.
 
                resp += ('<pre style="text-align:left; '
 
                         'border-top: 4px dashed red; padding: 1em;">')
 
                resp += cgi.escape(out.getvalue(), True)
 

	
 
                ct = objgraph.show_most_common_types()
 
                print ct
 

	
 
                resp += ct if ct else '---'
 

	
 
                output = StringIO()
 
                pprint.pprint(environ, output, depth=3)
 

	
 
                resp += cgi.escape(output.getvalue(), True)
 
                resp += '</pre>'
 

	
 
            return resp
kallithea/model/api_key.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.api_key
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
API key model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Sep 8, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import time
 
import logging
 
from sqlalchemy import or_
 

	
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.model import BaseModel
 
from kallithea.model.db import UserApiKeys
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ApiKeyModel(BaseModel):
 
    cls = UserApiKeys
 

	
 
    def create(self, user, description, lifetime=-1):
 
        """
 
        :param user: user or user_id
 
        :param description: description of ApiKey
 
        :param lifetime: expiration time in seconds
 
        """
 
        user = self._get_user(user)
 

	
 
        new_api_key = UserApiKeys()
 
        new_api_key.api_key = generate_api_key()
 
        new_api_key.user_id = user.user_id
 
        new_api_key.description = description
 
        new_api_key.expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        Session().add(new_api_key)
 

	
 
        return new_api_key
 

	
 
    def delete(self, api_key, user=None):
 
        """
 
        Deletes given api_key, if user is set it also filters the object for
 
        deletion by given user.
 
        """
 
        api_key = UserApiKeys.query().filter(UserApiKeys.api_key == api_key)
 

	
 
        if user is not None:
 
            user = self._get_user(user)
 
            api_key = api_key.filter(UserApiKeys.user_id == user.user_id)
 

	
 
        api_key = api_key.scalar()
 
        Session().delete(api_key)
 

	
 
    def get_api_keys(self, user, show_expired=True):
 
        user = self._get_user(user)
 
        user_api_keys = UserApiKeys.query()\
 
            .filter(UserApiKeys.user_id == user.user_id)
 
        if not show_expired:
 
            user_api_keys = user_api_keys\
 
                .filter(or_(UserApiKeys.expires == -1,
 
                            UserApiKeys.expires >= time.time()))
 
        return user_api_keys
kallithea/model/gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.gist
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
gist model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import time
 
import logging
 
import traceback
 
import shutil
 

	
 
from kallithea.lib.utils2 import safe_unicode, unique_id, safe_int, \
 
    time_to_datetime, AttributeDict
 
from kallithea.lib.compat import json
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Gist
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
class GistModel(BaseModel):
 
    cls = Gist
 

	
 
    def _get_gist(self, gist):
 
        """
 
        Helper method to get gist by ID, or gist_access_id as a fallback
 

	
 
        :param gist: GistID, gist_access_id, or Gist instance
 
        """
 
        return self._get_instance(Gist, gist, callback=Gist.get_by_access_id)
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s", rm_path)
 
        shutil.rmtree(rm_path)
 

	
 
    def _store_metadata(self, repo, gist_id, gist_access_id, user_id, gist_type,
 
                        gist_expires):
 
        """
 
        store metadata inside the gist, this can be later used for imports
 
        or gist identification
 
        """
 
        metadata = {
 
            'metadata_version': '1',
 
            'gist_db_id': gist_id,
 
            'gist_access_id': gist_access_id,
 
            'gist_owner_id': user_id,
 
            'gist_type': gist_type,
 
            'gist_expires': gist_expires,
 
            'gist_updated': time.time(),
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(json.dumps(metadata))
 

	
 
    def get_gist(self, gist):
 
        return self._get_gist(gist)
 

	
 
    def get_gist_files(self, gist_access_id, revision=None):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset(revision)
 
        return cs, [n for n in cs.get_node('/')]
 

	
 
    def create(self, description, owner, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
 
        """
 

	
 
        :param description: description of the gist
 
        :param owner: user who created this gist
 
        :param gist_mapping: mapping {filename:{'content':content},...}
 
        :param gist_type: type of gist private/public
 
        :param lifetime: in minutes, -1 == forever
 
        """
 
        owner = self._get_user(owner)
 
        gist_id = safe_unicode(unique_id(20))
 
        lifetime = safe_int(lifetime, -1)
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s',
 
                  time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever')
 
        #create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_id
 
        gist.gist_owner = owner.user_id
 
        gist.gist_expires = gist_expires
 
        gist.gist_type = safe_unicode(gist_type)
 
        self.sa.add(gist)
 
        self.sa.flush()
 
        if gist_type == Gist.GIST_PUBLIC:
 
            # use DB ID for easy to use GIST ID
 
            gist_id = safe_unicode(gist.gist_id)
 
            gist.gist_access_id = gist_id
 
            self.sa.add(gist)
 

	
 
        gist_repo_path = os.path.join(GIST_STORE_LOC, gist_id)
 
        log.debug('Creating new %s GIST repo in %s', gist_type, gist_repo_path)
 
        repo = RepoModel()._create_filesystem_repo(
 
            repo_name=gist_id, repo_type='hg', repo_group=GIST_STORE_LOC)
 

	
 
        processed_mapping = {}
 
        for filename in gist_mapping:
 
            if filename != os.path.basename(filename):
 
                raise Exception('Filename cannot be inside a directory')
 

	
 
            content = gist_mapping[filename]['content']
 
            #TODO: expand support for setting explicit lexers
 
#             if lexer is None:
 
#                 try:
 
#                     guess_lexer = pygments.lexers.guess_lexer_for_filename
 
#                     lexer = guess_lexer(filename,content)
 
#                 except pygments.util.ClassNotFound:
 
#                     lexer = 'text'
 
            processed_mapping[filename] = {'content': content}
 

	
 
        # now create single multifile commit
 
        message = 'added file'
 
        message += 's: ' if len(processed_mapping) > 1 else ': '
 
        message += ', '.join([x for x in processed_mapping])
 

	
 
        #fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=gist_repo_path,
 
            scm_instance_no_cache=lambda: repo,
 
        ))
 
        ScmModel().create_nodes(
 
            user=owner.user_id, repo=fake_repo,
 
            message=message,
 
            nodes=processed_mapping,
 
            trigger_push_hook=False
 
        )
 

	
 
        self._store_metadata(repo, gist.gist_id, gist.gist_access_id,
 
                             owner.user_id, gist.gist_type, gist.gist_expires)
 
        return gist
 

	
 
    def delete(self, gist, fs_remove=True):
 
        gist = self._get_gist(gist)
 
        try:
 
            self.sa.delete(gist)
 
            if fs_remove:
 
                self.__delete_gist(gist)
 
            else:
 
                log.debug('skipping removal from filesystem')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, gist, description, owner, gist_mapping, gist_type,
 
               lifetime):
 
        gist = self._get_gist(gist)
 
        gist_repo = gist.scm_instance
 

	
 
        lifetime = safe_int(lifetime, -1)
 
        if lifetime == 0:  # preserve old value
 
            gist_expires = gist.gist_expires
 
        else:
 
            gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 

	
 
        #calculate operation type based on given data
 
        gist_mapping_op = {}
 
        for k, v in gist_mapping.items():
 
            # add, mod, del
 
            if not v['org_filename'] and v['filename']:
 
                op = 'add'
 
            elif v['org_filename'] and not v['filename']:
 
                op = 'del'
 
            else:
 
                op = 'mod'
 

	
 
            v['op'] = op
 
            gist_mapping_op[k] = v
 

	
 
        gist.gist_description = description
 
        gist.gist_expires = gist_expires
 
        gist.owner = owner
 
        gist.gist_type = gist_type
 
        self.sa.add(gist)
 
        self.sa.flush()
 

	
 
        message = 'updated file'
 
        message += 's: ' if len(gist_mapping) > 1 else ': '
kallithea/model/repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository model for kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 5, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import shutil
 
import logging
 
import traceback
 
from datetime import datetime
 
from sqlalchemy.orm import subqueryload
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \
 
    remove_prefix, obfuscate_url_pw, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.hooks import log_delete_repository
 

	
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Repository, UserRepoToPerm, UserGroupRepoToPerm, \
 
    UserRepoGroupToPerm, UserGroupRepoGroupToPerm, User, Permission, \
 
    Statistics, UserGroup, Ui, RepoGroup, RepositoryField
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionAny, HasUserGroupPermissionAny
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.model.scm import UserGroupList
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(BaseModel):
 

	
 
    cls = Repository
 
    URL_SEPARATOR = Repository.url_sep()
 

	
 
    def _get_user_group(self, users_group):
 
        return self._get_instance(UserGroup, users_group,
 
                                  callback=UserGroup.get_by_group_name)
 

	
 
    def _get_repo_group(self, repo_group):
 
        return self._get_instance(RepoGroup, repo_group,
 
                                  callback=RepoGroup.get_by_group_name)
 

	
 
    def _create_default_perms(self, repository, private):
 
        # create default permission
 
        default = 'repository.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('repository.'):
 
                default = p.permission.permission_name
 
                break
 

	
 
        default_perm = 'repository.none' if private else default
 

	
 
        repo_to_perm = UserRepoToPerm()
 
        repo_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_to_perm.repository = repository
 
        repo_to_perm.user_id = def_user.user_id
 

	
 
        return repo_to_perm
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = self.sa.query(Ui).filter(Ui.ui_key == '/').one()
 
        return q.ui_value
 

	
 
    def get(self, repo_id, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_id == repo_id)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_id))
 
        return repo.scalar()
 

	
 
    def get_repo(self, repository):
 
        return self._get_repo(repository)
 

	
 
    def get_by_repo_name(self, repo_name, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_name == repo_name)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_name))
 
        return repo.scalar()
 

	
 
    def get_all_user_repos(self, user):
 
        """
 
        Gets all repositories that user have at least read access
 

	
 
        :param user:
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        user = self._get_user(user)
 
        repos = AuthUser(user_id=user.user_id).permissions['repositories']
 
        access_check = lambda r: r[1] in ['repository.read',
 
                                          'repository.write',
 
                                          'repository.admin']
 
        repos = [x[0] for x in filter(access_check, repos.items())]
 
        return Repository.query().filter(Repository.repo_name.in_(repos))
 

	
 
    def get_users_js(self):
 
        users = self.sa.query(User).filter(User.active == True).all()
 
        return json.dumps([
 
            {
 
                'id': u.user_id,
 
                'fname': h.escape(u.name),
 
                'lname': h.escape(u.lastname),
 
                'nname': u.username,
 
                'gravatar_lnk': h.gravatar_url(u.email, size=28),
 
                'gravatar_size': 14,
 
            } for u in users]
 
        )
 

	
 
    def get_user_groups_js(self):
 
        user_groups = self.sa.query(UserGroup) \
 
            .filter(UserGroup.users_group_active == True) \
 
            .options(subqueryload(UserGroup.members)) \
 
            .all()
 
        user_groups = UserGroupList(user_groups, perm_set=['usergroup.read',
 
                                                           'usergroup.write',
 
                                                           'usergroup.admin'])
 
        return json.dumps([
 
            {
 
                'id': gr.users_group_id,
 
                'grname': gr.users_group_name,
 
                'grmembers': len(gr.members),
 
            } for gr in user_groups]
 
        )
 

	
 
    @classmethod
 
    def _render_datatable(cls, tmpl, *args, **kwargs):
 
        import kallithea
 
        from pylons import tmpl_context as c
 
        from pylons.i18n.translation import _
 

	
 
        _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        tmpl = template.get_def(tmpl)
 
        kwargs.update(dict(_=_, h=h, c=c))
 
        return tmpl.render(*args, **kwargs)
 

	
 
    @classmethod
 
    def update_repoinfo(cls, repositories=None):
 
        if not repositories:
 
            repositories = Repository.getAll()
 
        for repo in repositories:
 
            repo.update_changeset_cache()
 

	
 
    def get_repos_as_dict(self, repos_list=None, admin=False, perm_check=True,
 
                          super_user_actions=False):
 
        _render = self._render_datatable
 
        from pylons import tmpl_context as c
 

	
 
        def quick_menu(repo_name):
 
            return _render('quick_menu', repo_name)
 

	
 
        def repo_lnk(name, rtype, rstate, private, fork_of):
 
            return _render('repo_name', name, rtype, rstate, private, fork_of,
 
                           short_name=not admin, admin=False)
 

	
 
        def last_change(last_change):
 
            return _render("last_change", last_change)
 

	
 
        def rss_lnk(repo_name):
 
            return _render("rss", repo_name)
 

	
 
        def atom_lnk(repo_name):
 
            return _render("atom", repo_name)
 

	
 
        def last_rev(repo_name, cs_cache):
 
            return _render('revision', repo_name, cs_cache.get('revision'),
 
                           cs_cache.get('raw_id'), cs_cache.get('author'),
 
                           cs_cache.get('message'))
 

	
 
        def desc(desc):
 
            return h.urlify_text(desc, truncate=60, stylize=c.visual.stylify_metatags)
 

	
 
        def state(repo_state):
 
            return _render("repo_state", repo_state)
 

	
 
        def repo_actions(repo_name):
 
            return _render('repo_actions', repo_name, super_user_actions)
 

	
 
        def owner_actions(user_id, username):
 
            return _render('user_name', user_id, username)
 

	
kallithea/model/scm.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.scm
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Scm model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import re
 
import time
 
import traceback
 
import logging
 
import cStringIO
 
import pkg_resources
 
from os.path import join as jn
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
import kallithea
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea import BACKENDS
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url,\
 
    _set_extras
 
from kallithea.lib.auth import HasRepoPermissionAny, HasRepoGroupPermissionAny,\
 
    HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAll
 
from kallithea.lib.utils import get_filesystem_repos, make_ui, \
 
    action_logger
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Repository, Ui, CacheInvalidation, \
 
    UserFollowing, UserLog, User, RepoGroup, PullRequest
 
from kallithea.lib.hooks import log_push_action
 
from kallithea.lib.exceptions import NonRelativePathError, IMCCommitError
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
 

	
 

	
 
class RepoTemp(object):
 
    def __init__(self, repo_id):
 
        self.repo_id = repo_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
 

	
 

	
 
class CachedRepoList(object):
 
    """
 
    Cached repo list. Uses super-fast in-memory cache after initialization.
 
    """
 

	
 
    def __init__(self, db_repo_list, repos_path, order_by=None, perm_set=None):
 
        self.db_repo_list = db_repo_list
 
        self.repos_path = repos_path
 
        self.order_by = order_by
 
        self.reversed = (order_by or '').startswith('-')
 
        if not perm_set:
 
            perm_set = ['repository.read', 'repository.write',
 
                        'repository.admin']
 
        self.perm_set = perm_set
 

	
 
    def __len__(self):
 
        return len(self.db_repo_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 

	
 
    def __iter__(self):
 
        # pre-propagated valid_cache_keys to save executing select statements
 
        # for each repo
 
        valid_cache_keys = CacheInvalidation.get_valid_cache_keys()
 

	
 
        for dbr in self.db_repo_list:
 
            scmr = dbr.scm_instance_cached(valid_cache_keys)
 
            # check permission at this level
 
            if not HasRepoPermissionAny(
 
                *self.perm_set)(dbr.repo_name, 'get repo check'):
 
                continue
 

	
 
            try:
 
                last_change = scmr.last_change
 
                tip = h.get_changeset_safe(scmr, 'tip')
 
            except Exception:
 
                log.error(
 
                    '%s this repository is present in database but it '
 
                    'cannot be created as an scm instance, org_exc:%s'
 
                    % (dbr.repo_name, traceback.format_exc())
 
                )
 
                continue
 

	
 
            tmp_d = {}
 
            tmp_d['name'] = dbr.repo_name
 
            tmp_d['name_sort'] = tmp_d['name'].lower()
 
            tmp_d['raw_name'] = tmp_d['name'].lower()
 
            tmp_d['description'] = dbr.description
 
            tmp_d['description_sort'] = tmp_d['description'].lower()
 
            tmp_d['last_change'] = last_change
 
            tmp_d['last_change_sort'] = time.mktime(last_change.timetuple())
 
            tmp_d['tip'] = tip.raw_id
 
            tmp_d['tip_sort'] = tip.revision
 
            tmp_d['rev'] = tip.revision
 
            tmp_d['contact'] = dbr.user.full_contact
 
            tmp_d['contact_sort'] = tmp_d['contact']
 
            tmp_d['owner_sort'] = tmp_d['contact']
 
            tmp_d['repo_archives'] = list(scmr._get_archives())
 
            tmp_d['last_msg'] = tip.message
 
            tmp_d['author'] = tip.author
 
            tmp_d['dbrepo'] = dbr.get_dict()
 
            tmp_d['dbrepo_fork'] = dbr.fork.get_dict() if dbr.fork else {}
 
            yield tmp_d
 

	
 

	
 
class SimpleCachedRepoList(CachedRepoList):
 
    """
 
    Lighter version of CachedRepoList without the scm initialisation
 
    """
 

	
 
    def __iter__(self):
 
        for dbr in self.db_repo_list:
 
            # check permission at this level
 
            if not HasRepoPermissionAny(
 
                *self.perm_set)(dbr.repo_name, 'get repo check'):
 
                continue
 

	
 
            tmp_d = {
 
                'name': dbr.repo_name,
 
                'dbrepo': dbr.get_dict(),
 
                'dbrepo_fork': dbr.fork.get_dict() if dbr.fork else {}
 
            }
 
            yield tmp_d
 

	
 

	
 
class _PermCheckIterator(object):
 
    def __init__(self, obj_list, obj_attr, perm_set, perm_checker, extra_kwargs=None):
 
        """
 
        Creates iterator from given list of objects, additionally
 
        checking permission for them from perm_set var
 

	
 
        :param obj_list: list of db objects
 
        :param obj_attr: attribute of object to pass into perm_checker
 
        :param perm_set: list of permissions to check
 
        :param perm_checker: callable to check permissions against
 
        """
 
        self.obj_list = obj_list
 
        self.obj_attr = obj_attr
 
        self.perm_set = perm_set
 
        self.perm_checker = perm_checker
 
        self.extra_kwargs = extra_kwargs or {}
 

	
 
    def __len__(self):
 
        return len(self.obj_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 

	
 
    def __iter__(self):
 
        for db_obj in self.obj_list:
 
            # check permission at this level
 
            name = getattr(db_obj, self.obj_attr, None)
 
            if not self.perm_checker(*self.perm_set)(
 
                    name, self.__class__.__name__, **self.extra_kwargs):
 
                continue
 

	
 
            yield db_obj
 

	
 

	
 
class RepoList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['repository.read', 'repository.write', 'repository.admin']
 

	
 
        super(RepoList, self).__init__(obj_list=db_repo_list,
 
                    obj_attr='repo_name', perm_set=perm_set,
 
                    perm_checker=HasRepoPermissionAny,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class RepoGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_group_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['group.read', 'group.write', 'group.admin']
 

	
 
        super(RepoGroupList, self).__init__(obj_list=db_repo_group_list,
 
                    obj_attr='group_name', perm_set=perm_set,
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
tests for api. run with::
 

	
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import random
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import Repository, User, Setting
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = 'test_user_group'
 
TEST_REPO_GROUP = 'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: json.loads(json.dumps(obj))
 

	
 

	
 
def crash(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname='first',
 
            lastname='last'
 
        )
 
        Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setUp(self):
 
        self.maxDiff = None
 
        make_user_group()
 
        make_repo_group()
 

	
 
    def tearDown(self):
 
        fixture.destroy_user_group(TEST_USER_GROUP)
 
        fixture.destroy_gists()
 
        fixture.destroy_repo_group(TEST_REPO_GROUP)
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
    def test_Optional_object(self):
 
        from kallithea.controllers.api.api import Optional
 

	
 
        option1 = Optional(None)
 
        self.assertEqual('<Optional:%s>' % None, repr(option1))
 
        self.assertEqual(option1(), None)
 

	
 
        self.assertEqual(1, Optional.extract(Optional(1)))
 
        self.assertEqual('trololo', Optional.extract('trololo'))
 

	
 
    def test_Optional_OAttr(self):
 
        from kallithea.controllers.api.api import Optional, OAttr
 

	
 
        option1 = Optional(OAttr('apiuser'))
 
        self.assertEqual('apiuser', Optional.extract(option1))
 

	
 
    def test_OAttr_object(self):
 
        from kallithea.controllers.api.api import OAttr
 

	
 
        oattr1 = OAttr('apiuser')
 
        self.assertEqual('<OptionalAttr:apiuser>', repr(oattr1))
 
        self.assertEqual(oattr1(), oattr1)
 

	
 
    def test_api_wrong_key(self):
 
        id_, params = _build_data('trololo', 'get_user')
 
        response = api_call(self, params)
 

	
 
        expected = 'Invalid API key'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_null(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_api_args_different_args(self):
 
        import string
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
kallithea/tests/functional/test_admin.py
Show inline comments
 
from __future__ import with_statement
 
import os
 
import csv
 
import datetime
 
from kallithea.tests import *
 
from kallithea.model.db import UserLog
 
from kallithea.model.meta import Session
 
from kallithea.lib.utils2 import safe_unicode
 

	
 
dn = os.path.dirname
 
FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
 

	
 

	
 
class TestAdminController(TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        def strptime(val):
 
            fmt = '%Y-%m-%d %H:%M:%S'
 
            if '.' not in val:
 
                return datetime.datetime.strptime(val, fmt)
 

	
 
            nofrag, frag = val.split(".")
 
            date = datetime.datetime.strptime(nofrag, fmt)
 

	
 
            frag = frag[:6]  # truncate to microseconds
 
            frag += (6 - len(frag)) * '0'  # add 0s
 
            return date.replace(microsecond=int(frag))
 

	
 
        with open(os.path.join(FIXTURES, 'journal_dump.csv')) as f:
 
            for row in csv.DictReader(f):
 
                ul = UserLog()
 
                for k, v in row.iteritems():
 
                    v = safe_unicode(v)
 
                    if k == 'action_date':
 
                        v = strptime(v)
 
                    if k in ['user_id', 'repository_id']:
 
                        # nullable due to FK problems
 
                        v = None
 
                    setattr(ul, k, v)
 
                Session().add(ul)
 
            Session().commit()
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index'))
 
        response.mustcontain('Admin Journal')
 

	
 
    def test_filter_all_entries(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',))
 
        response.mustcontain('2034 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:xxx'))
 
        response.mustcontain('3 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:XxX'))
 
        response.mustcontain('3 Entries')
 

	
 
    def test_filter_journal_filter_wildcard_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:*test*'))
 
        response.mustcontain('862 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:test*'))
 
        response.mustcontain('257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:Test*'))
 
        response.mustcontain('257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_and_user(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:test* AND username:demo'))
 
        response.mustcontain('130 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_or_other_repo(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='repository:test* OR repository:xxx'))
 
        response.mustcontain('260 Entries')  # 257 + 3
 

	
 
    def test_filter_journal_filter_exact_match_on_username(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='username:demo'))
 
        response.mustcontain('1087 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_username_camelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='username:DemO'))
 
        response.mustcontain('1087 Entries')
 

	
 
    def test_filter_journal_filter_wildcard_on_username(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='username:*test*'))
 
        response.mustcontain('100 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_username(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='username:demo*'))
 
        response.mustcontain('1101 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_user_or_other_user(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='username:demo OR username:volcan'))
 
        response.mustcontain('1095 Entries')  # 1087 + 8
 

	
 
    def test_filter_journal_filter_wildcard_on_action(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='action:*pull_request*'))
 
        response.mustcontain('187 Entries')
 

	
 
    def test_filter_journal_filter_on_date(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='date:20121010'))
 
        response.mustcontain('47 Entries')
 

	
 
    def test_filter_journal_filter_on_date_2(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
                                    filter='date:20121020'))
 
        response.mustcontain('17 Entries')
kallithea/tests/functional/test_login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from __future__ import with_statement
 
import re
 
import time
 

	
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.lib.auth import check_password
 
from kallithea.lib import helpers as h
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model import validators
 
from kallithea.model.db import User, Notification
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(TestController):
 
    def setUp(self):
 
        self.remove_all_notifications()
 
        self.assertEqual(Notification.query().all(), [])
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertEqual(response.status, '200 OK')
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assert_authenticated_user(response, TEST_USER_ADMIN_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Users Administration')
 

	
 
    def test_login_do_not_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': False})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertFalse(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r has expiration date, but should be a session cookie' % cookie)
 

	
 
    def test_login_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': True})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertTrue(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r should have expiration date, but is a session cookie' % cookie)
 

	
 
    def test_logout(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(url(controller='login', action='index'))
 
        response = response.follow()
 
        self.assertIn('authuser', response.session)
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertNotIn('authuser', response.session)
 

	
 
    @parameterized.expand([
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://some.ftp.server',),
 
          ('http://other.domain/bl%C3%A5b%C3%A6rgr%C3%B8d',),
 
    ])
 
    def test_login_bad_came_froms(self, url_came_from):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=url_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response._environ['paste.testing_variables']
 
                         ['tmpl_context'].came_from, '/')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'as'})
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        response.mustcontain('Enter 3 characters or more')
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        response.mustcontain('Invalid username or password')
 

	
 
    # verify that get arguments are correctly passed along login redirection
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_to_login_form_preserves_get_args(self, args, args_encoded):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='summary', action='index',
 
                                        repo_name=HG_REPO,
 
                                        **args))
 
            self.assertEqual(response.status, '302 Found')
 
            for encoded in args_encoded:
 
                self.assertIn(encoded, response.location)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_preserves_get_args(self, args, args_encoded):
 
        response = self.app.get(url(controller='login', action='index',
 
                                    came_from = '/_admin/users',
 
                                    **args))
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.form.action)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from = '/_admin/users',
 
                                     **args),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.location)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from = '/_admin/users',
 
                                     **args),
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        response.mustcontain('Invalid username or password')
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.form.action)
kallithea/tests/models/test_diff_parsers.py
Show inline comments
 
from __future__ import with_statement
 
from kallithea.tests import *
 
from kallithea.lib.diffs import DiffProcessor, NEW_FILENODE, DEL_FILENODE, \
 
    MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE, COPIED_FILENODE
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 

	
 
DIFF_FIXTURES = {
 
    'hg_diff_add_single_binary_file.diff': [
 
        ('US Warszawa.jpg', 'A',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {NEW_FILENODE: 'new file 100755',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 
    'hg_diff_mod_single_binary_file.diff': [
 
        ('US Warszawa.jpg', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {MOD_FILENODE: 'modified file',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 

	
 
    'hg_diff_mod_single_file_and_rename_and_chmod.diff': [
 
        ('README', 'R',
 
         {'added': 3,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {RENAMED_FILENODE: 'file renamed from README.rst to README',
 
                  CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
 
    ],
 
    'hg_diff_mod_file_and_rename.diff': [
 
        ('README.rst', 'R',
 
         {'added': 3,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {RENAMED_FILENODE: 'file renamed from README to README.rst'}}),
 
    ],
 
    'hg_diff_del_single_binary_file.diff': [
 
        ('US Warszawa.jpg', 'D',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {DEL_FILENODE: 'deleted file',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 
    'hg_diff_chmod_and_mod_single_binary_file.diff': [
 
        ('gravatar.png', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
    ],
 
    'hg_diff_chmod.diff': [
 
        ('file', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
 
    ],
 
    'hg_diff_rename_file.diff': [
 
        ('file_renamed', 'R',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {RENAMED_FILENODE: 'file renamed from file to file_renamed'}}),
 
    ],
 
    'hg_diff_rename_and_chmod_file.diff': [
 
        ('README', 'R',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
 
                  RENAMED_FILENODE: 'file renamed from README.rst to README'}}),
 
    ],
 
    'hg_diff_binary_and_normal.diff': [
 
        ('img/baseline-10px.png', 'A',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {NEW_FILENODE: 'new file 100644',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
        ('img/baseline-20px.png', 'D',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {DEL_FILENODE: 'deleted file',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
        ('index.html', 'M',
 
         {'added': 3,
 
          'deleted': 2,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
        ('js/global.js', 'D',
 
         {'added': 0,
 
          'deleted': 75,
 
          'binary': False,
 
          'ops': {DEL_FILENODE: 'deleted file'}}),
 
        ('js/jquery/hashgrid.js', 'A',
 
         {'added': 340,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {NEW_FILENODE: 'new file 100755'}}),
 
        ('less/docs.less', 'M',
 
         {'added': 34,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
        ('less/scaffolding.less', 'M',
 
         {'added': 1,
 
          'deleted': 3,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
        ('readme.markdown', 'M',
 
         {'added': 1,
 
          'deleted': 10,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
    ],
 
    'git_diff_chmod.diff': [
 
        ('work-horus.xls', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755'}})
 
    ],
 
    'git_diff_rename_file.diff': [
 
        ('file.xls', 'R',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {RENAMED_FILENODE: 'file renamed from work-horus.xls to file.xls'}})
 
    ],
 
    'git_diff_mod_single_binary_file.diff': [
 
        ('US Warszawa.jpg', 'M',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {MOD_FILENODE: 'modified file',
 
                  BIN_FILENODE: 'binary diff not shown'}})
 
    ],
 
    'git_diff_binary_and_normal.diff': [
 
        ('img/baseline-10px.png', 'A',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {NEW_FILENODE: 'new file 100644',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
        ('img/baseline-20px.png', 'D',
 
         {'added': 0,
 
          'deleted': 0,
 
          'binary': True,
 
          'ops': {DEL_FILENODE: 'deleted file',
 
                  BIN_FILENODE: 'binary diff not shown'}}),
 
        ('index.html', 'M',
 
         {'added': 3,
 
          'deleted': 2,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
        ('js/global.js', 'D',
 
         {'added': 0,
 
          'deleted': 75,
 
          'binary': False,
 
          'ops': {DEL_FILENODE: 'deleted file'}}),
 
        ('js/jquery/hashgrid.js', 'A',
 
         {'added': 340,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {NEW_FILENODE: 'new file 100755'}}),
 
        ('less/docs.less', 'M',
 
         {'added': 34,
 
          'deleted': 0,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
        ('less/scaffolding.less', 'M',
 
         {'added': 1,
 
          'deleted': 3,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
        ('readme.markdown', 'M',
 
         {'added': 1,
 
          'deleted': 10,
 
          'binary': False,
 
          'ops': {MOD_FILENODE: 'modified file'}}),
 
    ],
 
    'diff_with_diff_data.diff': [
 
        ('vcs/backends/base.py', 'M',
 
         {'added': 18,
kallithea/tests/other/test_libs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.other.test_libs
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import with_statement
 
import datetime
 
import hashlib
 
import mock
 
from kallithea.tests import *
 
from kallithea.lib.utils2 import AttributeDict
 
from kallithea.model.db import Repository
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
 
     '%s://domain.org' % proto),
 
    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
 
                                                '8080'],
 
     '%s://domain.org:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
 
     '%s://domain.org' % proto),
 
    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
 
                                                '8080'],
 
     '%s://domain.org:8080' % proto),
 
]
 

	
 

	
 
class TestLibs(BaseTestCase):
 

	
 
    @parameterized.expand(TEST_URLS)
 
    def test_uri_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import uri_filter
 
        self.assertEqual(uri_filter(test_url), expected)
 

	
 
    @parameterized.expand(TEST_URLS)
 
    def test_credentials_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import credentials_filter
 
        self.assertEqual(credentials_filter(test_url), expected_creds)
 

	
 
    @parameterized.expand([('t', True),
 
                           ('true', True),
 
                           ('y', True),
 
                           ('yes', True),
 
                           ('on', True),
 
                           ('1', True),
 
                           ('Y', True),
 
                           ('yeS', True),
 
                           ('Y', True),
 
                           ('TRUE', True),
 
                           ('T', True),
 
                           ('False', False),
 
                           ('F', False),
 
                           ('FALSE', False),
 
                           ('0', False),
 
                           ('-1', False),
 
                           ('', False)
 
    ])
 
    def test_str2bool(self, str_bool, expected):
 
        from kallithea.lib.utils2 import str2bool
 
        self.assertEqual(str2bool(str_bool), expected)
 

	
 
    def test_mention_extractor(self):
 
        from kallithea.lib.utils2 import extract_mentioned_users
 
        sample = (
 
            "@first hi there @world here's my email username@email.com "
 
            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
 
            "@UPPER    @cAmEL @2one_more22 @john please see this http://org.pl "
 
            "@marian.user just do it @marco-polo and next extract @marco_polo "
 
            "user.dot  hej ! not-needed maril@domain.org"
 
        )
 

	
 
        s = sorted([
 
            '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
 
            'marian.user', 'marco-polo', 'marco_polo', 'world'], key=lambda k: k.lower())
 
        self.assertEqual(s, extract_mentioned_users(sample))
 

	
 
    @parameterized.expand([
 
        (dict(), u'just now'),
 
        (dict(seconds= -1), u'1 second ago'),
 
        (dict(seconds= -60 * 2), u'2 minutes ago'),
 
        (dict(hours= -1), u'1 hour ago'),
 
        (dict(hours= -24), u'1 day ago'),
 
        (dict(hours= -24 * 5), u'5 days ago'),
 
        (dict(months= -1), u'1 month ago'),
 
        (dict(months= -1, days= -2), u'1 month and 2 days ago'),
 
        (dict(months= -1, days= -20), u'1 month and 19 days ago'),
 
        (dict(years= -1, months= -1), u'1 year and 1 month ago'),
 
        (dict(years= -1, months= -10), u'1 year and 10 months ago'),
 
        (dict(years= -2, months= -4), u'2 years and 4 months ago'),
 
        (dict(years= -2, months= -11), u'2 years and 11 months ago'),
 
        (dict(years= -3, months= -2), u'3 years and 2 months ago'),
 
    ])
 
    def test_age(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        self.assertEqual(age(n + delt(**age_args), now=n), expected)
 

	
 
    @parameterized.expand([
 
        (dict(), u'just now'),
 
        (dict(seconds= -1), u'1 second ago'),
 
        (dict(seconds= -60 * 2), u'2 minutes ago'),
 
        (dict(hours= -1), u'1 hour ago'),
 
        (dict(hours= -24), u'1 day ago'),
 
        (dict(hours= -24 * 5), u'5 days ago'),
 
        (dict(months= -1), u'1 month ago'),
 
        (dict(months= -1, days= -2), u'1 month ago'),
 
        (dict(months= -1, days= -20), u'1 month ago'),
 
        (dict(years= -1, months= -1), u'13 months ago'),
 
        (dict(years= -1, months= -10), u'22 months ago'),
 
        (dict(years= -2, months= -4), u'2 years ago'),
 
        (dict(years= -2, months= -11), u'3 years ago'),
 
        (dict(years= -3, months= -2), u'3 years ago'),
 
        (dict(years= -4, months= -8), u'5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        self.assertEqual(age(n + delt(**age_args), show_short_version=True, now=n), expected)
 

	
 
    @parameterized.expand([
 
        (dict(), u'just now'),
 
        (dict(seconds=1), u'in 1 second'),
 
        (dict(seconds=60 * 2), u'in 2 minutes'),
 
        (dict(hours=1), u'in 1 hour'),
 
        (dict(hours=24), u'in 1 day'),
 
        (dict(hours=24 * 5), u'in 5 days'),
 
        (dict(months=1), u'in 1 month'),
 
        (dict(months=1, days=1), u'in 1 month and 1 day'),
 
        (dict(years=1, months=1), u'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        self.assertEqual(age(n + delt(**age_args), now=n), expected)
 

	
 
    def test_tag_exctrator(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://url.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
        )
 
        from kallithea.lib.helpers import urlify_text
 
        res = urlify_text(sample, stylize=True)
 
        self.assertIn('<div class="metatag" tag="tag">tag</div>', res)
 
        self.assertIn('<div class="metatag" tag="obsolete">obsolete</div>', res)
 
        self.assertIn('<div class="metatag" tag="stale">stale</div>', res)
 
        self.assertIn('<div class="metatag" tag="lang">python</div>', res)
 
        self.assertIn('<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>', res)
 
        self.assertIn('<div class="metatag" tag="tag">tag</div>', res)
 

	
 
    def test_alternative_gravatar(self):
 
        from kallithea.lib.helpers import gravatar_url
 
        _md5 = lambda s: hashlib.md5(s).hexdigest()
 

	
 
        #mock pylons.url
 
        class fake_url(object):
 
            @classmethod
 
            def current(cls, *args, **kwargs):
 
                return 'https://server.com'
 

	
 
        #mock pylons.tmpl_context
 
        def fake_tmpl_context(_url):
 
            _c = AttributeDict()
 
            _c.visual = AttributeDict()
 
            _c.visual.use_gravatar = True
 
            _c.visual.gravatar_url = _url
 

	
 
            return _c
 

	
 

	
kallithea/tests/vcs/test_archives.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import tarfile
 
import zipfile
 
import datetime
 
import tempfile
 
import StringIO
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class ArchivesTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('%d/file_%d.txt' % (x, x),
 
                        content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_archive_zip(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='zip', prefix='repo')
 
        out = zipfile.ZipFile(path)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            decompressed = StringIO.StringIO()
 
            decompressed.write(out.read('repo/' + node_path))
 
            self.assertEqual(
 
                decompressed.getvalue(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_tgz(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='tgz', prefix='repo')
 
        outdir = tempfile.mkdtemp()
 

	
 
        outfile = tarfile.open(path, 'r|gz')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            self.assertEqual(
 
                open(os.path.join(outdir, 'repo/' + node_path)).read(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_tbz2(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'w+b') as f:
 
            self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo')
 
        outdir = tempfile.mkdtemp()
 

	
 
        outfile = tarfile.open(path, 'r|bz2')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            self.assertEqual(
 
                open(os.path.join(outdir, 'repo/' + node_path)).read(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_default_stream(self):
 
        tmppath = tempfile.mkstemp()[1]
 
        with open(tmppath, 'w') as stream:
 
            self.tip.fill_archive(stream=stream)
 
        mystream = StringIO.StringIO()
 
        self.tip.fill_archive(stream=mystream)
 
        mystream.seek(0)
 
        with open(tmppath, 'r') as f:
 
            self.assertEqual(f.read(), mystream.read())
 

	
 
    def test_archive_wrong_kind(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(kind='wrong kind')
 

	
 
    def test_archive_empty_prefix(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(prefix='')
 

	
 
    def test_archive_prefix_with_leading_slash(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(prefix='/any')
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s archive test' % alias).title().split())
 
    bases = (ArchivesTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_branches.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.lib import vcs
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.nodes import FileNode
 

	
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 

	
 
class BranchesTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': 'Changes...',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('some/new.txt', content='news...'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'Foobar I'),
 
                ],
 
                'removed': [],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.date, datetime.datetime(2010, 1, 1, 21))
 

	
 
    def test_new_branch(self):
 
        # This check must not be removed to ensure the 'branches' LazyProperty
 
        # gets hit *before* the new 'foobar' branch got created:
 
        self.assertFalse('foobar' in self.repo.branches)
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertTrue('foobar' in self.repo.branches)
 
        self.assertEqual(foobar_tip.branch, 'foobar')
 

	
 
    def test_new_head(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        self.assertEqual(newest_tip.branch,
 
            self.backend_class.DEFAULT_BRANCH_NAME)
 

	
 
    def test_branch_with_slash_in_name(self):
 
        self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
 
        self.imc.commit(u'Branch with a slash!', author=u'joe',
 
            branch='issue/123')
 
        self.assertTrue('issue/123' in self.repo.branches)
 

	
 
    def test_branch_with_slash_in_name_and_similar_without(self):
 
        self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
 
        self.imc.commit(u'Branch with a slash!', author=u'joe',
 
            branch='issue/123')
 
        self.imc.add(vcs.nodes.FileNode('extrafile II', content='Some data\n'))
 
        self.imc.commit(u'Branch without a slash...', author=u'joe',
 
            branch='123')
 
        self.assertIn('issue/123', self.repo.branches)
 
        self.assertIn('123', self.repo.branches)
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s branches test' % alias).title().split())
 
    bases = (BranchesTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_changesets.py
Show inline comments
 
# encoding: utf8
 
from __future__ import with_statement
 

	
 
import time
 
import datetime
 
from kallithea.lib import vcs
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 
from kallithea.lib.vcs.backends.base import BaseChangeset
 
from kallithea.lib.vcs.nodes import (
 
    FileNode, AddedFileNodesGenerator,
 
    ChangedFileNodesGenerator, RemovedFileNodesGenerator
 
)
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError,
 
    RepositoryError, EmptyRepositoryError
 
)
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.conf import get_new_dir
 

	
 

	
 
class TestBaseChangeset(unittest.TestCase):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.id = 'ID'
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        self.assertEqual(changeset.as_dict(), {
 
            'id': 'ID',
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
                'name': 'Joe Doe',
 
                'email': 'joe.doe@example.com',
 
            },
 
            'added': ['foo/bar/baz', 'foobar'],
 
            'changed': [],
 
            'removed': [],
 
        })
 

	
 
class _ChangesetsWithCommitsTestCaseixin(_BackendTestMixin):
 
    recreate_repo_per_test = True
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_new_branch(self):
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertTrue('foobar' in self.repo.branches)
 
        self.assertEqual(foobar_tip.branch, 'foobar')
 
        # 'foobar' should be the only branch that contains the new commit
 
        self.assertNotEqual(*self.repo.branches.values())
 

	
 
    def test_new_head_in_default_branch(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        self.assertEqual(newest_tip.branch,
 
            self.backend_class.DEFAULT_BRANCH_NAME)
 

	
 
    def test_get_changesets_respects_branch_name(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        doc_changeset = self.imc.commit(
 
            message=u'New branch: docs',
 
            author=u'joe',
 
            branch='docs',
 
        )
 
        self.imc.add(vcs.nodes.FileNode('newfile', content=''))
 
        self.imc.commit(
 
            message=u'Back in default branch',
 
            author=u'joe',
 
            parents=[tip],
 
        )
 
        default_branch_changesets = self.repo.get_changesets(
 
            branch_name=self.repo.DEFAULT_BRANCH_NAME)
 
        self.assertNotIn(doc_changeset, default_branch_changesets)
 

	
 
    def test_get_changeset_by_branch(self):
 
        for branch, sha in self.repo.branches.iteritems():
 
            self.assertEqual(sha, self.repo.get_changeset(branch).raw_id)
 

	
 
    def test_get_changeset_by_tag(self):
 
        for tag, sha in self.repo.tags.iteritems():
 
            self.assertEqual(sha, self.repo.get_changeset(tag).raw_id)
 

	
 
    def test_get_changeset_parents(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev-1)
 
            self.assertEqual([sha], self.repo.get_changeset(test_rev).parents)
 

	
 
    def test_get_changeset_children(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev+1)
 
            self.assertEqual([sha], self.repo.get_changeset(test_rev).children)
 

	
 

	
 
class _ChangesetsTestCaseMixin(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': u'Commit %d' % x,
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.date, datetime.datetime(2010, 1, 3, 20))
 

	
 
    def test_get_changesets_is_ordered_by_date(self):
 
        changesets = list(self.repo.get_changesets())
 
        ordered_by_date = sorted(changesets,
 
            key=lambda cs: cs.date)
 
        self.assertItemsEqual(changesets, ordered_by_date)
 

	
 
    def test_get_changesets_respects_start(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(len(changesets), 4)
 

	
 
    def test_get_changesets_numerical_id_respects_start(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(len(changesets), 4)
 

	
 
    def test_get_changesets_includes_start_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(changesets[0].raw_id, second_id)
 

	
 
    def test_get_changesets_respects_end(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets[-1].raw_id, second_id)
 
        self.assertEqual(len(changesets), 2)
kallithea/tests/vcs/test_filenodes_unicode_path.py
Show inline comments
 
# encoding: utf8
 

	
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.test_inmemchangesets import BackendBaseTestCase
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 

	
 
class FileNodeUnicodePathTestsMixin(object):
 

	
 
    fname = 'ąśðąęłąć.txt'
 
    ufname = (fname).decode('utf-8')
 

	
 
    def get_commits(self):
 
        self.nodes = [
 
            FileNode(self.fname, content='Foobar'),
 
        ]
 

	
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': self.nodes,
 
            },
 
        ]
 
        return commits
 

	
 
    def test_filenode_path(self):
 
        node = self.tip.get_node(self.fname)
 
        unode = self.tip.get_node(self.ufname)
 
        self.assertEqual(node, unode)
 

	
 

	
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s file node unicode path test' % alias).title()
 
        .split())
 
    bases = (FileNodeUnicodePathTestsMixin, BackendBaseTestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_getitem.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class GetitemTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getitem__last_item_is_tip(self):
 
        self.assertEqual(self.repo[-1], self.repo.get_changeset())
 

	
 
    def test__getitem__returns_correct_items(self):
 
        changesets = [self.repo[x] for x in xrange(len(self.repo.revisions))]
 
        self.assertEqual(changesets, list(self.repo.get_changesets()))
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s getitem test' % alias).title().split())
 
    bases = (GetitemTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_getslice.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class GetsliceTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getslice__last_item_is_tip(self):
 
        self.assertEqual(list(self.repo[-1:])[0], self.repo.get_changeset())
 

	
 
    def test__getslice__respects_start_index(self):
 
        self.assertEqual(list(self.repo[2:]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[2:]])
 

	
 
    def test__getslice__respects_negative_start_index(self):
 
        self.assertEqual(list(self.repo[-2:]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[-2:]])
 

	
 
    def test__getslice__respects_end_index(self):
 
        self.assertEqual(list(self.repo[:2]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[:2]])
 

	
 
    def test__getslice__respects_negative_end_index(self):
 
        self.assertEqual(list(self.repo[:-2]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[:-2]])
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s getslice test' % alias).title().split())
 
    bases = (GetsliceTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_git.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import sys
 
import mock
 
import datetime
 
import urllib2
 
from kallithea.lib.vcs.backends.git import GitRepository, GitChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.model.scm import ScmModel
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
 

	
 

	
 
class GitRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_GIT_REPO_CLONE):
 
            self.fail('Cannot test git clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_GIT_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
 

	
 
    def test_git_cmd_injection(self):
 
        repo_inject_path = TEST_GIT_REPO + '; echo "Cake";'
 
        with self.assertRaises(urllib2.URLError):
 
            # Should fail because URL will contain the parts after ; too
 
            urlerror_fail_repo = GitRepository(get_new_dir('injection-repo'), src_url=repo_inject_path, update_after_clone=True, create=True)
 

	
 
        with self.assertRaises(RepositoryError):
 
            # Should fail on direct clone call, which as of this writing does not happen outside of class
 
            clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
 
            clone_fail_repo.clone(repo_inject_path, update_after_clone=True,)
 

	
 
        # Verify correct quoting of evil characters that should work on posix file systems
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            tricky_path = get_new_dir("tricky-path-repo-$'`")
 
        else:
 
            tricky_path = get_new_dir("tricky-path-repo-$'\"`")
 
        successfully_cloned = GitRepository(tricky_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        self.assertFalse(successfully_cloned._repo.bare)
 

	
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$'`")
 
        else:
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$'\"`")
 
        successfully_cloned2 = GitRepository(tricky_path_2, src_url=tricky_path, bare=True, create=True)
 
        # Repo should have been created and thus used correct quoting for clone
 
        self.assertTrue(successfully_cloned2._repo.bare)
 

	
 
        # Should pass because URL has been properly quoted
 
        successfully_cloned.pull(tricky_path_2)
 
        successfully_cloned2.fetch(tricky_path)
 

	
 
    def test_repo_create_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        repo = GitRepository(repo_path, src_url=None, bare=True, create=True)
 
        # Repo should have been created
 
        self.assertTrue(repo._repo.bare)
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = GitRepository(TEST_GIT_REPO)
 
        repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
 
            src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        successfully_cloned = GitRepository(repo_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        self.assertFalse(successfully_cloned._repo.bare)
 

	
 
        successfully_cloned.pull(TEST_GIT_REPO)
 
        self.repo.fetch(repo_path)
 

	
 
    def test_repo_clone_without_create(self):
 
        self.assertRaises(RepositoryError, GitRepository,
 
            TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_with_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 

	
 
        #check if current workdir was updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        self.assertEqual(True, os.path.isfile(fpath),
 
            'Repo was cloned and updated but file %s could not be found'
 
            % fpath)
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_without_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=False)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        #check if current workdir was *NOT* updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        # Make sure it's not bare repo
 
        self.assertFalse(repo_clone._repo.bare)
 
        self.assertEqual(False, os.path.isfile(fpath),
 
            'Repo was cloned and updated but file %s was found'
 
            % fpath)
 

	
 
    def test_repo_clone_into_bare_repo(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
 
        repo_clone = GitRepository(clone_path, create=True,
 
            src_url=repo.path, bare=True)
 
        self.assertTrue(repo_clone._repo.bare)
 

	
 
    def test_create_repo_is_not_bare_by_default(self):
 
        repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
 
        self.assertFalse(repo._repo.bare)
 

	
 
    def test_create_bare_repo(self):
 
        repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
 
        self.assertTrue(repo._repo.bare)
 

	
 
    def test_revisions(self):
 
        # there are 112 revisions (by now)
 
        # so we can assume they would be available from now on
 
        subset = set([
 
            'c1214f7e79e02fc37156ff215cd71275450cffc3',
 
            '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
 
            'fa6600f6848800641328adbf7811fd2372c02ab2',
 
            '102607b09cdd60e2793929c4f90478be29f85a17',
 
            '49d3fd156b6f7db46313fac355dca1a0b94a0017',
 
            '2d1028c054665b962fa3d307adfc923ddd528038',
 
            'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
 
            'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
 
            '8430a588b43b5d6da365400117c89400326e7992',
 
            'd955cd312c17b02143c04fa1099a352b04368118',
 
            'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
 
            'add63e382e4aabc9e1afdc4bdc24506c269b7618',
 
            'f298fe1189f1b69779a4423f40b48edf92a703fc',
 
            'bd9b619eb41994cac43d67cf4ccc8399c1125808',
 
            '6e125e7c890379446e98980d8ed60fba87d0f6d1',
 
            'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
 
            '0b05e4ed56c802098dfc813cbe779b2f49e92500',
 
            '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
 
            '45223f8f114c64bf4d6f853e3c35a369a6305520',
 
            'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
 
            'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
 
            '27d48942240f5b91dfda77accd2caac94708cc7d',
 
            '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
 
            'e686b958768ee96af8029fe19c6050b1a8dd3b2b'])
 
        self.assertTrue(subset.issubset(set(self.repo.revisions)))
 

	
 

	
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 
        # Removed (those are 'remotes' branches for cloned repo)
 
        #self.assertTrue('master' in self.repo.branches)
 
        #self.assertTrue('gittree' in self.repo.branches)
 
        #self.assertTrue('web-branch' in self.repo.branches)
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), GitChangeset))
 

	
 
    def test_tags(self):
 
        # TODO: Need more tests here
 
        self.assertTrue('v0.1.1' in self.repo.tags)
kallithea/tests/vcs/test_hg.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
from kallithea.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, NodeState
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, \
 
    TEST_HG_REPO_PULL
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
# Use only clean mercurial's ui
 
from kallithea.lib.vcs.utils.hgcompat import mercurial
 
mercurial.scmutil.rcpath()
 
if mercurial.scmutil._rcpath:
 
    mercurial.scmutil._rcpath = mercurial.scmutil._rcpath[:1]
 

	
 

	
 
class MercurialRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_HG_REPO_CLONE):
 
            self.fail('Cannot test mercurial clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_HG_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path)
 

	
 
    def test_unicode_path_repo(self):
 
        self.assertRaises(VCSError,lambda:MercurialRepository(u'iShouldFail'))
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 

	
 
        #check if current workdir was updated
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_w_update',
 
                                                    'MANIFEST.in')), True,)
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
 
            src_url=TEST_HG_REPO, update_after_clone=False)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_wo_update',
 
                                                    'MANIFEST.in')), False,)
 

	
 
    def test_pull(self):
 
        if os.path.exists(TEST_HG_REPO_PULL):
 
            self.fail('Cannot test mercurial pull command as location %s '
 
                      'already exists. You should manually remove it first'
 
                      % TEST_HG_REPO_PULL)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
 
        self.assertTrue(len(self.repo.revisions) > len(repo_new.revisions))
 

	
 
        repo_new.pull(self.repo.path)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL)
 
        self.assertTrue(len(self.repo.revisions) == len(repo_new.revisions))
 

	
 
    def test_revisions(self):
 
        # there are 21 revisions at bitbucket now
 
        # so we can assume they would be available from now on
 
        subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                 '6cba7170863a2411822803fa77a0a264f1310b35',
 
                 '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                 '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                 '6fff84722075f1607a30f436523403845f84cd9e',
 
                 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                 'be90031137367893f1c406e0a8683010fd115b79',
 
                 'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                 '84478366594b424af694a6c784cb991a16b87c21',
 
                 '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                 '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                 'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                 'eada5a770da98ab0dd7325e29d00e0714f228d09'
 
                ])
 
        self.assertTrue(subset.issubset(set(self.repo.revisions)))
 

	
 

	
 
        # check if we have the proper order of revisions
 
        org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                '6cba7170863a2411822803fa77a0a264f1310b35',
 
                '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                '6fff84722075f1607a30f436523403845f84cd9e',
 
                '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                'be90031137367893f1c406e0a8683010fd115b79',
 
                'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                '84478366594b424af694a6c784cb991a16b87c21',
 
                '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                'eada5a770da98ab0dd7325e29d00e0714f228d09',
 
                '2c1885c735575ca478bf9e17b0029dca68824458',
 
                'd9bcd465040bf869799b09ad732c04e0eea99fe9',
 
                '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
 
                '4fb8326d78e5120da2c7468dcf7098997be385da',
 
                '62b4a097164940bd66030c4db51687f3ec035eed',
 
                '536c1a19428381cfea92ac44985304f6a8049569',
 
                '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
 
                '9bb326a04ae5d98d437dece54be04f830cf1edd9',
 
                'f8940bcb890a98c4702319fbe36db75ea309b475',
 
                'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
 
                '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
 
                'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
 
        self.assertEqual(org, self.repo.revisions[:31])
 

	
 
    def test_iter_slice(self):
 
        sliced = list(self.repo[:10])
 
        itered = list(self.repo)[:10]
 
        self.assertEqual(sliced, itered)
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 

	
 
        #active branches
 
        self.assertTrue('default' in self.repo.branches)
 
        self.assertTrue('stable' in self.repo.branches)
 

	
 
        # closed
 
        self.assertTrue('git' in self.repo._get_branches(closed=True))
 
        self.assertTrue('web' in self.repo._get_branches(closed=True))
 

	
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), MercurialChangeset))
 

	
 
    def test_tip_in_tags(self):
 
        # tip is always a tag
 
        self.assertIn('tip', self.repo.tags)
 

	
 
    def test_tip_changeset_in_tags(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(self.repo.tags['tip'], tip.raw_id)
 

	
 
    def test_initial_changeset(self):
 

	
 
        init_chset = self.repo.get_changeset(0)
 
        self.assertEqual(init_chset.message, 'initial import')
 
        self.assertEqual(init_chset.author,
 
            'Marcin Kuzminski <marcin@python-blog.com>')
 
        self.assertEqual(sorted(init_chset._file_paths),
 
            sorted([
 
                'vcs/__init__.py',
 
                'vcs/backends/BaseRepository.py',
 
                'vcs/backends/__init__.py',
 
            ])
 
        )
 
        self.assertEqual(sorted(init_chset._dir_paths),
 
            sorted(['', 'vcs', 'vcs/backends']))
 

	
 
        self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
kallithea/tests/vcs/test_inmemchangesets.py
Show inline comments
 
# encoding: utf8
 
"""
 
Tests so called "in memory changesets" commit API of vcs.
 
"""
 
from __future__ import with_statement
 

	
 
import time
 
import datetime
 

	
 
from kallithea.lib import vcs
 
from kallithea.tests.vcs.conf import SCM_TESTS, get_new_dir
 
from kallithea.lib.vcs.exceptions import EmptyRepositoryError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyAddedError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyExistsError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyRemovedError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyChangedError
 
from kallithea.lib.vcs.exceptions import NodeDoesNotExistError
 
from kallithea.lib.vcs.exceptions import NodeNotChangedError
 
from kallithea.lib.vcs.nodes import DirNode
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.utils import safe_unicode
 

	
 

	
 
class InMemoryChangesetTestMixin(object):
 
    """
 
    This is a backend independent test case class which should be created
 
    with ``type`` method.
 

	
 
    It is required to set following attributes at subclass:
 

	
 
    - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
 
    - ``repo_path``: path to the repository which would be created for set of
 
      tests
 
    """
 

	
 
    def get_backend(self):
 
        return vcs.get_backend(self.backend_alias)
 

	
 
    def setUp(self):
 
        Backend = self.get_backend()
 
        self.repo_path = get_new_dir(str(time.time()))
 
        self.repo = Backend(self.repo_path, create=True)
 
        self.imc = self.repo.in_memory_changeset
 
        self.nodes = [
 
            FileNode('foobar', content='Foo & bar'),
 
            FileNode('foobar2', content='Foo & bar, doubled!'),
 
            FileNode('foo bar with spaces', content=''),
 
            FileNode('foo/bar/baz', content='Inside'),
 
            FileNode('foo/bar/file.bin', content='\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'),
 
        ]
 

	
 
    def test_add(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_in_bulk(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        self.imc.add(*to_add)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_actually_adds_all_nodes_at_second_commit_too(self):
 
        self.imc.add(FileNode('foo/bar/image.png', content='\0'))
 
        self.imc.add(FileNode('foo/README.txt', content='readme!'))
 
        changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
 
        self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
 
        self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
 
        self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
 
        self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
 

	
 
        # commit some more files again
 
        to_add = [
 
            FileNode('foo/bar/foobaz/bar', content='foo'),
 
            FileNode('foo/bar/another/bar', content='foo'),
 
            FileNode('foo/baz.txt', content='foo'),
 
            FileNode('foobar/foobaz/file', content='foo'),
 
            FileNode('foobar/barbaz', content='foo'),
 
        ]
 
        self.imc.add(*to_add)
 
        changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
 
        self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo')
 
        self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo')
 
        self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo')
 
        self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo')
 
        self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo')
 

	
 
    def test_add_non_ascii_files(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [
 
            FileNode('żółwik/zwierzątko', content='ćććć'),
 
            FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
 
        ]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_raise_already_added(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.assertRaises(NodeAlreadyAddedError, self.imc.add, node)
 

	
 
    def test_check_integrity_raise_already_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.add(node)
 
        self.assertRaises(NodeAlreadyExistsError, self.imc.commit,
 
            message='new message',
 
            author=str(self))
 

	
 
    def test_change(self):
 
        self.imc.add(FileNode('foo/bar/baz', content='foo'))
 
        self.imc.add(FileNode('foo/fbar', content='foobar'))
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('foo/bar/baz', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertNotEqual(tip, newtip)
 
        self.assertNotEqual(tip.id, newtip.id)
 
        self.assertEqual(newtip.get_node('foo/bar/baz').content,
 
                        'My **changed** content')
 

	
 
    def test_change_non_ascii(self):
 
        to_add = [
 
            FileNode('żółwik/zwierzątko', content='ćććć'),
 
            FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
 
        ]
 
        for node in to_add:
 
            self.imc.add(node)
 

	
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('żółwik/zwierzątko', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertNotEqual(tip, newtip)
 
        self.assertNotEqual(tip.id, newtip.id)
 

	
 
        self.assertEqual(newtip.get_node('żółwik/zwierzątko').content,
 
                         'My **changed** content')
 
        self.assertEqual(newtip.get_node('żółwik/zwierzątko_uni').content,
 
                         'My **changed** content')
kallithea/tests/vcs/test_nodes.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import stat
 
from kallithea.lib.vcs.nodes import DirNode
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.nodes import Node
 
from kallithea.lib.vcs.nodes import NodeError
 
from kallithea.lib.vcs.nodes import NodeKind
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class NodeBasicTest(unittest.TestCase):
 

	
 
    def test_init(self):
 
        """
 
        Cannot innitialize Node objects with path with slash at the beginning.
 
        """
 
        wrong_paths = (
 
            '/foo',
 
            '/foo/bar'
 
        )
 
        for path in wrong_paths:
 
            self.assertRaises(NodeError, Node, path, NodeKind.FILE)
 

	
 
        wrong_paths = (
 
            '/foo/',
 
            '/foo/bar/'
 
        )
 
        for path in wrong_paths:
 
            self.assertRaises(NodeError, Node, path, NodeKind.DIR)
 

	
 
    def test_name(self):
 
        node = Node('', NodeKind.DIR)
 
        self.assertEqual(node.name, '')
 

	
 
        node = Node('path', NodeKind.FILE)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('path/', NodeKind.DIR)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('some/path', NodeKind.FILE)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('some/path/', NodeKind.DIR)
 
        self.assertEqual(node.name, 'path')
 

	
 
    def test_root_node(self):
 
        self.assertRaises(NodeError, Node, '', NodeKind.FILE)
 

	
 
    def test_kind_setter(self):
 
        node = Node('', NodeKind.DIR)
 
        self.assertRaises(NodeError, setattr, node, 'kind', NodeKind.FILE)
 

	
 
    def _test_parent_path(self, node_path, expected_parent_path):
 
        """
 
        Tests if node's parent path are properly computed.
 
        """
 
        node = Node(node_path, NodeKind.DIR)
 
        parent_path = node.get_parent_path()
 
        self.assertTrue(parent_path.endswith('/') or \
 
            node.is_root() and parent_path == '')
 
        self.assertEqual(parent_path, expected_parent_path,
 
            "Node's path is %r and parent path is %r but should be %r"
 
            % (node.path, parent_path, expected_parent_path))
 

	
 
    def test_parent_path(self):
 
        test_paths = (
 
            # (node_path, expected_parent_path)
 
            ('', ''),
 
            ('some/path/', 'some/'),
 
            ('some/longer/path/', 'some/longer/'),
 
        )
 
        for node_path, expected_parent_path in test_paths:
 
            self._test_parent_path(node_path, expected_parent_path)
 

	
 
    '''
 
    def _test_trailing_slash(self, path):
 
        if not path.endswith('/'):
 
            self.fail("Trailing slash tests needs paths to end with slash")
 
        for kind in NodeKind.FILE, NodeKind.DIR:
 
            self.assertRaises(NodeError, Node, path=path, kind=kind)
 

	
 
    def test_trailing_slash(self):
 
        for path in ('/', 'foo/', 'foo/bar/', 'foo/bar/biz/'):
 
            self._test_trailing_slash(path)
 
    '''
 

	
 
    def test_is_file(self):
 
        node = Node('any', NodeKind.FILE)
 
        self.assertTrue(node.is_file())
 

	
 
        node = FileNode('any')
 
        self.assertTrue(node.is_file())
 
        self.assertRaises(AttributeError, getattr, node, 'nodes')
 

	
 
    def test_is_dir(self):
 
        node = Node('any_dir', NodeKind.DIR)
 
        self.assertTrue(node.is_dir())
 

	
 
        node = DirNode('any_dir')
 

	
 
        self.assertTrue(node.is_dir())
 
        self.assertRaises(NodeError, getattr, node, 'content')
 

	
 
    def test_dir_node_iter(self):
 
        nodes = [
 
            DirNode('docs'),
 
            DirNode('tests'),
 
            FileNode('bar'),
 
            FileNode('foo'),
 
            FileNode('readme.txt'),
 
            FileNode('setup.py'),
 
        ]
 
        dirnode = DirNode('', nodes=nodes)
 
        for node in dirnode:
 
            node == dirnode.get_node(node.path)
 

	
 
    def test_node_state(self):
 
        """
 
        Without link to changeset nodes should raise NodeError.
 
        """
 
        node = FileNode('anything')
 
        self.assertRaises(NodeError, getattr, node, 'state')
 
        node = DirNode('anything')
 
        self.assertRaises(NodeError, getattr, node, 'state')
 

	
 
    def test_file_node_stat(self):
 
        node = FileNode('foobar', 'empty... almost')
 
        mode = node.mode  # default should be 0100644
 
        self.assertTrue(mode & stat.S_IRUSR)
 
        self.assertTrue(mode & stat.S_IWUSR)
 
        self.assertTrue(mode & stat.S_IRGRP)
 
        self.assertTrue(mode & stat.S_IROTH)
 
        self.assertFalse(mode & stat.S_IWGRP)
 
        self.assertFalse(mode & stat.S_IWOTH)
 
        self.assertFalse(mode & stat.S_IXUSR)
 
        self.assertFalse(mode & stat.S_IXGRP)
 
        self.assertFalse(mode & stat.S_IXOTH)
 

	
 
    def test_file_node_is_executable(self):
 
        node = FileNode('foobar', 'empty... almost', mode=0100755)
 
        self.assertTrue(node.is_executable)
 

	
 
        node = FileNode('foobar', 'empty... almost', mode=0100500)
 
        self.assertTrue(node.is_executable)
 

	
 
        node = FileNode('foobar', 'empty... almost', mode=0100644)
 
        self.assertFalse(node.is_executable)
 

	
 
    def test_mimetype(self):
 
        py_node = FileNode('test.py')
 
        tar_node = FileNode('test.tar.gz')
 

	
 
        ext = 'CustomExtension'
 

	
 
        my_node2 = FileNode('myfile2')
 
        my_node2._mimetype = [ext]
 

	
 
        my_node3 = FileNode('myfile3')
 
        my_node3._mimetype = [ext,ext]
 

	
 
        self.assertEqual(py_node.mimetype,'text/x-python')
 
        self.assertEqual(py_node.get_mimetype(),('text/x-python',None))
 

	
 
        self.assertEqual(tar_node.mimetype,'application/x-tar')
 
        self.assertEqual(tar_node.get_mimetype(),('application/x-tar','gzip'))
 

	
 
        self.assertRaises(NodeError,my_node2.get_mimetype)
 

	
 
        self.assertEqual(my_node3.mimetype,ext)
 
        self.assertEqual(my_node3.get_mimetype(),[ext,ext])
 

	
 
class NodeContentTest(unittest.TestCase):
 

	
 
    def test_if_binary(self):
 
        data = """\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x10\x00\x00\x00\x10\x08\x06\x00\x00\x00\x1f??a\x00\x00\x00\x04gAMA\x00\x00\xaf?7\x05\x8a?\x00\x00\x00\x19tEXtSoftware\x00Adobe ImageReadyq?e<\x00\x00\x025IDAT8?\xa5\x93?K\x94Q\x14\x87\x9f\xf7?Q\x1bs4?\x03\x9a\xa8?B\x02\x8b$\x10[U;i\x13?6h?&h[?"\x14j?\xa2M\x7fB\x14F\x9aQ?&\x842?\x0b\x89"\x82??!?\x9c!\x9c2l??{N\x8bW\x9dY\xb4\t/\x1c?=\x9b?}????\xa9*;9!?\x83\x91?[?\\v*?D\x04\'`EpNp\xa2X\'U?pVq"Sw.\x1e?\x08\x01D?jw????\xbc??7{|\x9b?\x89$\x01??W@\x15\x9c\x05q`Lt/\x97?\x94\xa1d?\x18~?\x18?\x18W[%\xb0?\x83??\x14\x88\x8dB?\xa6H\tL\tl\x19>/\x01`\xac\xabx?\x9cl\nx\xb0\x98\x07\x95\x88D$"q[\x19?d\x00(o\n\xa0??\x7f\xb9\xa4?\x1bF\x1f\x8e\xac\xa8?j??eUU}?.?\x9f\x8cE??x\x94??\r\xbdtoJU5"0N\x10U?\x00??V\t\x02\x9f\x81?U?\x00\x9eM\xae2?r\x9b7\x83\x82\x8aP3????.?&"?\xb7ZP \x0c<?O\xa5\t}\xb8?\x99\xa6?\x87?\x1di|/\xa0??0\xbe\x1fp?d&\x1a\xad\x95\x8a\x07?\t*\x10??b:?d?.\x13C\x8a?\x12\xbe\xbf\x8e?{???\x08?\x80\xa7\x13+d\x13>J?\x80\x15T\x95\x9a\x00??S\x8c\r?\xa1\x03\x07?\x96\x9b\xa7\xab=E??\xa4\xb3?\x19q??B\x91=\x8d??k?J\x0bV"??\xf7x?\xa1\x00?\\.\x87\x87???\x02F@D\x99],??\x10#?X\xb7=\xb9\x10?Z\x1by???cI??\x1ag?\x92\xbc?T?t[\x92\x81?<_\x17~\x92\x88?H%?\x10Q\x02\x9f\n\x81qQ\x0bm?\x1bX?\xb1AK\xa6\x9e\xb9?u\xb2?1\xbe|/\x92M@\xa2!F?\xa9>"\r<DT?>\x92\x8e?>\x9a9Qv\x127?a\xac?Y?8?:??]X???9\x80\xb7?u?\x0b#BZ\x8d=\x1d?p\x00\x00\x00\x00IEND\xaeB`\x82"""
 
        filenode = FileNode('calendar.png', content=data)
 
        self.assertTrue(filenode.is_binary)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_repository.py
Show inline comments
 
from __future__ import with_statement
 
import datetime
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.tests.vcs.conf import TEST_USER_CONFIG_FILE
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 

	
 

	
 
class RepositoryBaseTest(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return super(RepositoryBaseTest, cls)._get_commits()[:1]
 

	
 
    def test_get_config_value(self):
 
        self.assertEqual(self.repo.get_config_value('universal', 'foo',
 
            TEST_USER_CONFIG_FILE), 'bar')
 

	
 
    def test_get_config_value_defaults_to_None(self):
 
        self.assertEqual(self.repo.get_config_value('universal', 'nonexist',
 
            TEST_USER_CONFIG_FILE), None)
 

	
 
    def test_get_user_name(self):
 
        self.assertEqual(self.repo.get_user_name(TEST_USER_CONFIG_FILE),
 
            'Foo Bar')
 

	
 
    def test_get_user_email(self):
 
        self.assertEqual(self.repo.get_user_email(TEST_USER_CONFIG_FILE),
 
            'foo.bar@example.com')
 

	
 
    def test_repo_equality(self):
 
        self.assertTrue(self.repo == self.repo)
 

	
 
    def test_repo_equality_broken_object(self):
 
        import copy
 
        _repo = copy.copy(self.repo)
 
        delattr(_repo, 'path')
 
        self.assertTrue(self.repo != _repo)
 

	
 
    def test_repo_equality_other_object(self):
 
        class dummy(object):
 
            path = self.repo.path
 
        self.assertTrue(self.repo != dummy())
 

	
 

	
 
class RepositoryGetDiffTest(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='foobar'),
 
                    FileNode('foobar2', content='foobar2'),
 
                ],
 
            },
 
            {
 
                'message': 'Changed foobar, added foobar3',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('foobar3', content='foobar3'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'FOOBAR'),
 
                ],
 
            },
 
            {
 
                'message': 'Removed foobar, changed foobar3',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'changed': [
 
                    FileNode('foobar3', content='FOOBAR\nFOOBAR\nFOOBAR\n'),
 
                ],
 
                'removed': [FileNode('foobar')],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_raise_for_wrong(self):
 
        with self.assertRaises(ChangesetDoesNotExistError):
 
            self.repo.get_diff('a' * 40, 'b' * 40)
 

	
 

	
 
class GitRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase):
 
    backend_alias = 'git'
 

	
 
    def test_initial_commit_diff(self):
 
        initial_rev = self.repo.revisions[0]
 
        self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar
 
new file mode 100644
 
index 0000000000000000000000000000000000000000..f6ea0495187600e7b2288c8ac19c5886383a4632
 
--- /dev/null
 
+++ b/foobar
 
@@ -0,0 +1 @@
 
+foobar
 
\ No newline at end of file
 
diff --git a/foobar2 b/foobar2
 
new file mode 100644
 
index 0000000000000000000000000000000000000000..e8c9d6b98e3dce993a464935e1a53f50b56a3783
 
--- /dev/null
 
+++ b/foobar2
 
@@ -0,0 +1 @@
 
+foobar2
 
\ No newline at end of file
 
''')
 

	
 
    def test_second_changeset_diff(self):
 
        revs = self.repo.revisions
 
        self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar
 
index f6ea0495187600e7b2288c8ac19c5886383a4632..389865bb681b358c9b102d79abd8d5f941e96551 100644
 
--- a/foobar
 
+++ b/foobar
 
@@ -1 +1 @@
 
-foobar
 
\ No newline at end of file
 
+FOOBAR
 
\ No newline at end of file
 
diff --git a/foobar3 b/foobar3
 
new file mode 100644
 
index 0000000000000000000000000000000000000000..c11c37d41d33fb47741cff93fa5f9d798c1535b0
 
--- /dev/null
 
+++ b/foobar3
 
@@ -0,0 +1 @@
 
+foobar3
 
\ No newline at end of file
 
''')
 

	
 
    def test_third_changeset_diff(self):
 
        revs = self.repo.revisions
 
        self.assertEqual(self.repo.get_diff(revs[1], revs[2]), '''diff --git a/foobar b/foobar
 
deleted file mode 100644
 
index 389865bb681b358c9b102d79abd8d5f941e96551..0000000000000000000000000000000000000000
 
--- a/foobar
 
+++ /dev/null
 
@@ -1 +0,0 @@
 
-FOOBAR
 
\ No newline at end of file
 
diff --git a/foobar3 b/foobar3
 
index c11c37d41d33fb47741cff93fa5f9d798c1535b0..f9324477362684ff692aaf5b9a81e01b9e9a671c 100644
 
--- a/foobar3
 
+++ b/foobar3
 
@@ -1 +1,3 @@
 
-foobar3
 
\ No newline at end of file
 
+FOOBAR
 
+FOOBAR
 
+FOOBAR
 
''')
 

	
 

	
 
class HgRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase):
 
    backend_alias = 'hg'
 

	
 
    def test_initial_commit_diff(self):
 
        initial_rev = self.repo.revisions[0]
 
        self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar
 
new file mode 100644
 
--- /dev/null
 
+++ b/foobar
 
@@ -0,0 +1,1 @@
 
+foobar
 
\ No newline at end of file
 
diff --git a/foobar2 b/foobar2
 
new file mode 100644
 
--- /dev/null
 
+++ b/foobar2
 
@@ -0,0 +1,1 @@
 
+foobar2
 
\ No newline at end of file
 
''')
 

	
 
    def test_second_changeset_diff(self):
 
        revs = self.repo.revisions
 
        self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar
 
--- a/foobar
 
+++ b/foobar
 
@@ -1,1 +1,1 @@
 
-foobar
 
\ No newline at end of file
 
+FOOBAR
 
\ No newline at end of file
 
diff --git a/foobar3 b/foobar3
 
new file mode 100644
 
--- /dev/null
 
+++ b/foobar3
 
@@ -0,0 +1,1 @@
kallithea/tests/vcs/test_tags.py
Show inline comments
 
from __future__ import with_statement
 

	
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 
from kallithea.lib.vcs.exceptions import TagAlreadyExistError
 
from kallithea.lib.vcs.exceptions import TagDoesNotExistError
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TagsTestCaseMixin(_BackendTestMixin):
 

	
 
    def test_new_tag(self):
 
        tip = self.repo.get_changeset()
 
        tagsize = len(self.repo.tags)
 
        tag = self.repo.tag('last-commit', 'joe', tip.raw_id)
 

	
 
        self.assertEqual(len(self.repo.tags), tagsize + 1)
 
        for top, dirs, files in tip.walk():
 
            self.assertEqual(top, tag.get_node(top.path))
 

	
 
    def test_tag_already_exist(self):
 
        tip = self.repo.get_changeset()
 
        self.repo.tag('last-commit', 'joe', tip.raw_id)
 

	
 
        self.assertRaises(TagAlreadyExistError,
 
            self.repo.tag, 'last-commit', 'joe', tip.raw_id)
 

	
 
        chset = self.repo.get_changeset(0)
 
        self.assertRaises(TagAlreadyExistError,
 
            self.repo.tag, 'last-commit', 'jane', chset.raw_id)
 

	
 
    def test_remove_tag(self):
 
        tip = self.repo.get_changeset()
 
        self.repo.tag('last-commit', 'joe', tip.raw_id)
 
        tagsize = len(self.repo.tags)
 

	
 
        self.repo.remove_tag('last-commit', user='evil joe')
 
        self.assertEqual(len(self.repo.tags), tagsize - 1)
 

	
 
    def test_remove_tag_which_does_not_exist(self):
 
        self.assertRaises(TagDoesNotExistError,
 
            self.repo.remove_tag, 'last-commit', user='evil joe')
 

	
 
    def test_name_with_slash(self):
 
        self.repo.tag('19/10/11', 'joe')
 
        self.assertTrue('19/10/11' in self.repo.tags)
 
        self.repo.tag('11', 'joe')
 
        self.assertTrue('11' in self.repo.tags)
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s tags test' % alias).title().split())
 
    bases = (TagsTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_utils.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import mock
 
import time
 
import shutil
 
import tempfile
 
import datetime
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.utils.paths import get_dirs_for_path
 
from kallithea.lib.vcs.utils.helpers import get_dict_for_attrs
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.helpers import get_scms_for_path
 
from kallithea.lib.vcs.utils.helpers import get_total_seconds
 
from kallithea.lib.vcs.utils.helpers import parse_changesets
 
from kallithea.lib.vcs.utils.helpers import parse_datetime
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.vcs.utils.paths import get_user_home
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
 

	
 

	
 
class PathsTest(unittest.TestCase):
 

	
 
    def _test_get_dirs_for_path(self, path, expected):
 
        """
 
        Tests if get_dirs_for_path returns same as expected.
 
        """
 
        expected = sorted(expected)
 
        result = sorted(get_dirs_for_path(path))
 
        self.assertEqual(result, expected,
 
            msg="%s != %s which was expected result for path %s"
 
            % (result, expected, path))
 

	
 
    def test_get_dirs_for_path(self):
 
        path = 'foo/bar/baz/file'
 
        paths_and_results = (
 
            ('foo/bar/baz/file', ['foo', 'foo/bar', 'foo/bar/baz']),
 
            ('foo/bar/', ['foo', 'foo/bar']),
 
            ('foo/bar', ['foo']),
 
        )
 
        for path, expected in paths_and_results:
 
            self._test_get_dirs_for_path(path, expected)
 

	
 

	
 
    def test_get_scm(self):
 
        self.assertEqual(('hg', TEST_HG_REPO), get_scm(TEST_HG_REPO))
 
        self.assertEqual(('git', TEST_GIT_REPO), get_scm(TEST_GIT_REPO))
 

	
 
    def test_get_two_scms_for_path(self):
 
        multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo-2')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        self.assertRaises(VCSError, get_scm, multialias_repo_path)
 

	
 
    def test_get_scm_error_path(self):
 
        self.assertRaises(VCSError, get_scm, 'err')
 

	
 
    def test_get_scms_for_path(self):
 
        dirpath = tempfile.gettempdir()
 
        new = os.path.join(dirpath, 'vcs-scms-for-path-%s' % time.time())
 
        os.mkdir(new)
 
        self.assertEqual(get_scms_for_path(new), [])
 

	
 
        os.mkdir(os.path.join(new, '.tux'))
 
        self.assertEqual(get_scms_for_path(new), [])
 

	
 
        os.mkdir(os.path.join(new, '.git'))
 
        self.assertEqual(set(get_scms_for_path(new)), set(['git']))
 

	
 
        os.mkdir(os.path.join(new, '.hg'))
 
        self.assertEqual(set(get_scms_for_path(new)), set(['git', 'hg']))
 

	
 

	
 
class TestParseChangesets(unittest.TestCase):
 

	
 
    def test_main_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('123456'), {
 
            'start': None,
 
            'main': '123456',
 
            'end': None,
 
        })
 

	
 
    def test_start_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('aaabbb..'), {
 
            'start': 'aaabbb',
 
            'main': None,
 
            'end': None,
 
        })
 

	
 
    def test_end_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('..cccddd'), {
 
            'start': None,
 
            'main': None,
 
            'end': 'cccddd',
 
        })
 

	
 
    def test_that_two_or_three_dots_are_allowed(self):
 
        text1 = 'a..b'
 
        text2 = 'a...b'
 
        self.assertEqual(parse_changesets(text1), parse_changesets(text2))
 

	
 
    def test_that_input_is_stripped_first(self):
 
        text1 = 'a..bb'
 
        text2 = '  a..bb\t\n\t '
 
        self.assertEqual(parse_changesets(text1), parse_changesets(text2))
 

	
 
    def test_that_exception_is_raised(self):
 
        text = '123456.789012' # single dot is not recognized
 
        with self.assertRaises(ValueError):
 
            parse_changesets(text)
 

	
 
    def test_non_alphanumeric_raises_exception(self):
 
        with self.assertRaises(ValueError):
 
            parse_changesets('aaa@bbb')
 

	
 

	
 
class TestParseDatetime(unittest.TestCase):
 

	
 
    def test_datetime_text(self):
 
        self.assertEqual(parse_datetime('2010-04-07 21:29:41'),
 
            datetime.datetime(2010, 4, 7, 21, 29, 41))
 

	
 
    def test_no_seconds(self):
 
        self.assertEqual(parse_datetime('2010-04-07 21:29'),
 
            datetime.datetime(2010, 4, 7, 21, 29))
 

	
 
    def test_date_only(self):
 
        self.assertEqual(parse_datetime('2010-04-07'),
 
            datetime.datetime(2010, 4, 7))
 

	
 
    def test_another_format(self):
 
        self.assertEqual(parse_datetime('04/07/10 21:29:41'),
 
            datetime.datetime(2010, 4, 7, 21, 29, 41))
 

	
 
    def test_now(self):
 
        self.assertTrue(parse_datetime('now') - datetime.datetime.now() <
 
            datetime.timedelta(seconds=1))
 

	
 
    def test_today(self):
 
        today = datetime.date.today()
 
        self.assertEqual(parse_datetime('today'),
 
            datetime.datetime(*today.timetuple()[:3]))
 

	
 
    def test_yesterday(self):
 
        yesterday = datetime.date.today() - datetime.timedelta(days=1)
 
        self.assertEqual(parse_datetime('yesterday'),
 
            datetime.datetime(*yesterday.timetuple()[:3]))
 

	
 
    def test_tomorrow(self):
 
        tomorrow = datetime.date.today() + datetime.timedelta(days=1)
 
        args = tomorrow.timetuple()[:3] + (23, 59, 59)
 
        self.assertEqual(parse_datetime('tomorrow'), datetime.datetime(*args))
 

	
 
    def test_days(self):
 
        timestamp = datetime.datetime.today() - datetime.timedelta(days=3)
 
        args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
 
        expected = datetime.datetime(*args)
 
        self.assertEqual(parse_datetime('3d'), expected)
 
        self.assertEqual(parse_datetime('3 d'), expected)
 
        self.assertEqual(parse_datetime('3 day'), expected)
 
        self.assertEqual(parse_datetime('3 days'), expected)
 

	
 
    def test_weeks(self):
 
        timestamp = datetime.datetime.today() - datetime.timedelta(days=3 * 7)
 
        args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
 
        expected = datetime.datetime(*args)
 
        self.assertEqual(parse_datetime('3w'), expected)
 
        self.assertEqual(parse_datetime('3 w'), expected)
 
        self.assertEqual(parse_datetime('3 week'), expected)
 
        self.assertEqual(parse_datetime('3 weeks'), expected)
 

	
 
    def test_mixed(self):
 
        timestamp = datetime.datetime.today() - datetime.timedelta(days=2 * 7 + 3)
 
        args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
 
        expected = datetime.datetime(*args)
 
        self.assertEqual(parse_datetime('2w3d'), expected)
 
        self.assertEqual(parse_datetime('2w 3d'), expected)
 
        self.assertEqual(parse_datetime('2w 3 days'), expected)
 
        self.assertEqual(parse_datetime('2 weeks 3 days'), expected)
 

	
 

	
 
class TestAuthorExtractors(unittest.TestCase):
 
    TEST_AUTHORS = [("Username Last'o'Name <username@python-works.com>",
 
                    ("Username Last'o'Name", "username@python-works.com")),
 
                  ("Username Last'o'Name Spaces < username@python-works.com >",
 
                    ("Username Last'o'Name Spaces", "username@python-works.com")),
 
                  ("Username Last'o'Name <username.lastname@python-works.com>",
 
                    ("Username Last'o'Name", "username.lastname@python-works.com")),
kallithea/tests/vcs/test_utils_filesize.py
Show inline comments
 
from __future__ import with_statement
 

	
 
from kallithea.lib.vcs.utils.filesize import filesizeformat
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TestFilesizeformat(unittest.TestCase):
 

	
 
    def test_bytes(self):
 
        self.assertEqual(filesizeformat(10), '10 B')
 

	
 
    def test_kilobytes(self):
 
        self.assertEqual(filesizeformat(1024 * 2), '2 KB')
 

	
 
    def test_megabytes(self):
 
        self.assertEqual(filesizeformat(1024 * 1024 * 2.3), '2.3 MB')
 

	
 
    def test_gigabytes(self):
 
        self.assertEqual(filesizeformat(1024 * 1024 * 1024 * 12.92), '12.92 GB')
 

	
 
    def test_that_function_respects_sep_paramtere(self):
 
        self.assertEqual(filesizeformat(1, ''), '1B')
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_vcs.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import os
 
import shutil
 

	
 
from kallithea.lib.vcs import VCSError, get_repo, get_backend
 
from kallithea.lib.vcs.backends.hg import MercurialRepository
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
 

	
 

	
 

	
 
class VCSTest(unittest.TestCase):
 
    """
 
    Tests for main module's methods.
 
    """
 

	
 
    def test_get_backend(self):
 
        hg = get_backend('hg')
 
        self.assertEqual(hg, MercurialRepository)
 

	
 
    def test_alias_detect_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 
        self.assertEqual('hg',repo.alias)
 

	
 
    def test_alias_detect_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 
        self.assertEqual('git',repo.alias)
 

	
 
    def test_wrong_alias(self):
 
        alias = 'wrong_alias'
 
        self.assertRaises(VCSError, get_backend, alias)
 

	
 
    def test_get_repo(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path, alias).__class__)
 
        self.assertEqual(repo.path, get_repo(path, alias).path)
 

	
 
    def test_get_repo_autoalias_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path).__class__)
 
        self.assertEqual(repo.path, get_repo(path).path)
 

	
 
    def test_get_repo_autoalias_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path).__class__)
 
        self.assertEqual(repo.path, get_repo(path).path)
 

	
 

	
 
    def test_get_repo_err(self):
 
        blank_repo_path = os.path.join(TEST_TMP_PATH, 'blank-error-repo')
 
        if os.path.isdir(blank_repo_path):
 
            shutil.rmtree(blank_repo_path)
 

	
 
        os.mkdir(blank_repo_path)
 
        self.assertRaises(VCSError, get_repo, blank_repo_path)
 
        self.assertRaises(VCSError, get_repo, blank_repo_path + 'non_existing')
 

	
 
    def test_get_repo_multialias(self):
 
        multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        os.mkdir(os.path.join(multialias_repo_path, '.git'))
 
        os.mkdir(os.path.join(multialias_repo_path, '.hg'))
 
        self.assertRaises(VCSError, get_repo, multialias_repo_path)
kallithea/tests/vcs/test_workdirs.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import datetime
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import SCM_TESTS
 

	
 

	
 
class WorkdirTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': u'Initial commit',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': u'Changes...',
 
                'author': u'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('some/new.txt', content='news...'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'Foobar I'),
 
                ],
 
                'removed': [],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_get_branch_for_default_branch(self):
 
        self.assertEqual(self.repo.workdir.get_branch(),
 
            self.repo.DEFAULT_BRANCH_NAME)
 

	
 
    def test_get_branch_after_adding_one(self):
 
        self.imc.add(FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertEqual(self.repo.workdir.get_branch(), self.default_branch)
 

	
 
    def test_get_changeset(self):
 
        old_head = self.repo.get_changeset()
 
        self.imc.add(FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        head = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertEqual(self.repo.workdir.get_branch(), self.default_branch)
 
        self.repo.workdir.checkout_branch('foobar')
 
        self.assertEqual(self.repo.workdir.get_changeset(), head)
 

	
 
        # Make sure that old head is still there after update to defualt branch
 
        self.repo.workdir.checkout_branch(self.default_branch)
 
        self.assertEqual(self.repo.workdir.get_changeset(), old_head)
 

	
 
    def test_checkout_branch(self):
 
        from kallithea.lib.vcs.exceptions import BranchDoesNotExistError
 
        # first, 'foobranch' does not exist.
 
        self.assertRaises(BranchDoesNotExistError, self.repo.workdir.checkout_branch,
 
                          branch='foobranch')
 
        # create new branch 'foobranch'.
 
        self.imc.add(FileNode('file1', content='blah'))
 
        self.imc.commit(message=u'asd', author=u'john', branch='foobranch')
 
        # go back to the default branch
 
        self.repo.workdir.checkout_branch()
 
        self.assertEqual(self.repo.workdir.get_branch(), self.backend_class.DEFAULT_BRANCH_NAME)
 
        # checkout 'foobranch'
 
        self.repo.workdir.checkout_branch('foobranch')
 
        self.assertEqual(self.repo.workdir.get_branch(), 'foobranch')
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s branch test' % alias).title().split())
 
    bases = (WorkdirTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
0 comments (0 inline, 0 general)