Changeset - 4b68fbe195b6
[Not reviewed]
kallithea/bin/kallithea_api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_api
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Api CLI client for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 3, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import argparse
 
import json
 
import sys
 

	
 
from kallithea.bin.base import FORMAT_JSON, FORMAT_PRETTY, RcConf, api_call
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-api [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] "
 
      "METHOD <key:val> <key2:val> ...\n"
 
      "Create config file: kallithea-api --apikey=<key> --apihost=http://kallithea.example.com --save-config"
 
    )
 

	
 
    parser = argparse.ArgumentParser(description='Kallithea API cli',
 
                                     usage=usage)
 

	
 
    ## config
 
    group = parser.add_argument_group('config')
 
    group.add_argument('--apikey', help='api access key')
 
    group.add_argument('--apihost', help='api host')
 
    group.add_argument('--config', help='config file')
 
    group.add_argument('--save-config', action='store_true', help='save the given config into a file')
 

	
 
    group = parser.add_argument_group('API')
 
    group.add_argument('method', metavar='METHOD', nargs='?', type=str, default=None,
 
            help='API method name to call followed by key:value attributes',
 
    )
 
    group.add_argument('--format', dest='format', type=str,
 
            help='output format default: `%s` can '
 
                 'be also `%s`' % (FORMAT_PRETTY, FORMAT_JSON),
 
            default=FORMAT_PRETTY
 
    )
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def main(argv=None):
 
    """
 
    Main execution function for cli
 

	
 
    :param argv:
 
    """
 
    if argv is None:
 
        argv = sys.argv
 

	
 
    conf = None
 
    parser, args, other = argparser(argv)
 

	
 
    api_credentials_given = (args.apikey and args.apihost)
 
    if args.save_config:
 
        if not api_credentials_given:
 
            raise parser.error('--save-config requires --apikey and --apihost')
 
        conf = RcConf(config_location=args.config,
 
                      autocreate=True, config={'apikey': args.apikey,
 
                                               'apihost': args.apihost})
 
        sys.exit()
 

	
 
    if not conf:
 
        conf = RcConf(config_location=args.config, autoload=True)
 
        if not conf:
 
            if not api_credentials_given:
 
                parser.error('Could not find config file and missing '
 
                             '--apikey or --apihost in params')
 

	
 
    apikey = args.apikey or conf['apikey']
 
    apihost = args.apihost or conf['apihost']
 
    method = args.method
 

	
 
    # if we don't have method here it's an error
 
    if not method:
 
        parser.error('Please specify method name')
 

	
 
    try:
 
        margs = dict(s.split(':', 1) for s in other)
 
    except ValueError:
 
        sys.stderr.write('Error parsing arguments \n')
 
        sys.exit()
 
    if args.format == FORMAT_PRETTY:
 
        print('Calling method %s => %s' % (method, apihost))
 

	
 
    json_resp = api_call(apikey, apihost, method, **margs)
 
    error_prefix = ''
 
    if json_resp['error']:
 
        error_prefix = 'ERROR:'
 
        json_data = json_resp['error']
 
    else:
 
        json_data = json_resp['result']
 
    if args.format == FORMAT_JSON:
 
        print(json.dumps(json_data))
 
    elif args.format == FORMAT_PRETTY:
 
        print('Server response \n%s%s' % (
 
            error_prefix, json.dumps(json_data, indent=4, sort_keys=True)
 
        ))
 
    return 0
kallithea/bin/kallithea_cli_ishell.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
This file was forked by the Kallithea project in July 2014 and later moved.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import sys
 

	
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.model.db import *  # these names will be directly available in the IPython shell
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
def ishell():
 
    """Interactive shell for Kallithea."""
 
    try:
 
        from IPython import embed
 
    except ImportError:
 
        print('Kallithea ishell requires the Python package IPython 4 or later')
 
        sys.exit(-1)
 
    from traitlets.config.loader import Config
 
    cfg = Config()
 
    cfg.InteractiveShellEmbed.confirm_exit = False
 
    embed(config=cfg, banner1="Kallithea IShell.")
kallithea/bin/kallithea_gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.kallithea_gist
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Gist CLI client for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import argparse
 
import fileinput
 
import json
 
import os
 
import stat
 
import sys
 

	
 
from kallithea.bin.base import FORMAT_JSON, FORMAT_PRETTY, RcConf, api_call
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-gist [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] [GIST OPTIONS] "
 
      "[filename or stdin use - for terminal stdin ]\n"
 
      "Create config file: kallithea-gist --apikey=<key> --apihost=http://kallithea.example.com --save-config"
 
    )
 

	
 
    parser = argparse.ArgumentParser(description='Kallithea Gist cli',
 
                                     usage=usage)
 

	
 
    ## config
 
    group = parser.add_argument_group('config')
 
    group.add_argument('--apikey', help='api access key')
 
    group.add_argument('--apihost', help='api host')
 
    group.add_argument('--config', help='config file path DEFAULT: ~/.config/kallithea')
 
    group.add_argument('--save-config', action='store_true',
 
                       help='save the given config into a file')
 

	
 
    group = parser.add_argument_group('GIST')
 
    group.add_argument('-p', '--private', action='store_true',
 
                       help='create private Gist')
 
    group.add_argument('-f', '--filename',
 
                       help='set uploaded gist filename, '
 
                            'also defines syntax highlighting')
 
    group.add_argument('-d', '--description', help='Gist description')
 
    group.add_argument('-l', '--lifetime', metavar='MINUTES',
 
                       help='gist lifetime in minutes, -1 (DEFAULT) is forever')
 
    group.add_argument('--format', dest='format', type=str,
 
                       help='output format DEFAULT: `%s` can '
 
                       'be also `%s`' % (FORMAT_PRETTY, FORMAT_JSON),
 
            default=FORMAT_PRETTY
 
    )
 
    args, other = parser.parse_known_args()
 
    return parser, args, other
 

	
 

	
 
def _run(argv):
 
    conf = None
 
    parser, args, other = argparser(argv)
 

	
 
    api_credentials_given = (args.apikey and args.apihost)
 
    if args.save_config:
 
        if not api_credentials_given:
 
            raise parser.error('--save-config requires --apikey and --apihost')
 
        conf = RcConf(config_location=args.config,
 
                      autocreate=True, config={'apikey': args.apikey,
 
                                               'apihost': args.apihost})
 
        sys.exit()
 

	
 
    if not conf:
 
        conf = RcConf(config_location=args.config, autoload=True)
 
        if not conf:
 
            if not api_credentials_given:
 
                parser.error('Could not find config file and missing '
 
                             '--apikey or --apihost in params')
 

	
 
    apikey = args.apikey or conf['apikey']
 
    host = args.apihost or conf['apihost']
 
    DEFAULT_FILENAME = 'gistfile1.txt'
 
    if other:
 
        # skip multifiles for now
 
        filename = other[0]
 
        if filename == '-':
 
            filename = DEFAULT_FILENAME
 
            gist_content = ''
 
            for line in fileinput.input('-'):
 
                gist_content += line
 
        else:
 
            with open(filename, 'rb') as f:
 
                gist_content = f.read()
 

	
 
    else:
 
        filename = DEFAULT_FILENAME
 
        gist_content = None
 
        # little bit hacky but cross platform check where the
 
        # stdin comes from we skip the terminal case it can be handled by '-'
 
        mode = os.fstat(0).st_mode
 
        if stat.S_ISFIFO(mode):
 
            # "stdin is piped"
 
            gist_content = sys.stdin.read()
 
        elif stat.S_ISREG(mode):
 
            # "stdin is redirected"
 
            gist_content = sys.stdin.read()
 
        else:
 
            # "stdin is terminal"
kallithea/bin/ldap_sync.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.ldap_sync
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
LDAP sync script
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 06, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import urllib.request
 
import uuid
 
from configparser import ConfigParser
 

	
 
import ldap
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import ascii_bytes
 

	
 

	
 
config = ConfigParser()
 
config.read('ldap_sync.conf')
 

	
 

	
 
class InvalidResponseIDError(Exception):
 
    """ Request and response don't have the same UUID. """
 

	
 

	
 
class ResponseError(Exception):
 
    """ Response has an error, something went wrong with request execution. """
 

	
 

	
 
class UserAlreadyInGroupError(Exception):
 
    """ User is already a member of the target group. """
 

	
 

	
 
class UserNotInGroupError(Exception):
 
    """ User is not a member of the target group. """
 

	
 

	
 
class API(object):
 

	
 
    def __init__(self, url, key):
 
        self.url = url
 
        self.key = key
 

	
 
    def get_api_data(self, uid, method, args):
 
        """Prepare dict for API post."""
 
        return {
 
            "id": uid,
 
            "api_key": self.key,
 
            "method": method,
 
            "args": args
 
        }
 

	
 
    def post(self, method, args):
 
        """Send a generic API post to Kallithea.
 

	
 
        This will generate the UUID for validation check after the
 
        response is returned. Handle errors and get the result back.
 
        """
 
        uid = str(uuid.uuid1())
 
        data = self.get_api_data(uid, method, args)
 

	
 
        data = ascii_bytes(ext_json.dumps(data))
 
        headers = {'content-type': 'text/plain'}
 
        req = urllib.request.Request(self.url, data, headers)
 

	
 
        response = urllib.request.urlopen(req)
 
        response = ext_json.load(response)
 

	
 
        if uid != response["id"]:
 
            raise InvalidResponseIDError("UUID does not match.")
 

	
 
        if response["error"] is not None:
 
            raise ResponseError(response["error"])
 

	
 
        return response["result"]
 

	
 
    def create_group(self, name, active=True):
 
        """Create the Kallithea user group."""
 
        args = {
 
            "group_name": name,
 
            "active": str(active)
 
        }
 
        self.post("create_user_group", args)
 

	
 
    def add_membership(self, group, username):
 
        """Add specific user to a group."""
 
        args = {
 
            "usersgroupid": group,
 
            "userid": username
 
        }
 
        result = self.post("add_user_to_user_group", args)
 
        if not result["success"]:
 
            raise UserAlreadyInGroupError("User %s already in group %s." %
 
                                          (username, group))
 

	
 
    def remove_membership(self, group, username):
 
        """Remove specific user from a group."""
 
        args = {
 
            "usersgroupid": group,
 
            "userid": username
 
        }
 
        result = self.post("remove_user_from_user_group", args)
 
        if not result["success"]:
kallithea/lib/db_manage.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.db_manage
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Database creation, and setup module for Kallithea. Used for creation
 
of database as well as for migration operations
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 10, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import logging
 
import os
 
import sys
 
import uuid
 

	
 
import alembic.command
 
import alembic.config
 
import sqlalchemy
 
from sqlalchemy.engine import create_engine
 

	
 
from kallithea.model.base import init_model
 
from kallithea.model.db import Permission, RepoGroup, Repository, Setting, Ui, User, UserRepoGroupToPerm, UserToPerm
 
#from kallithea.model import meta
 
from kallithea.model.meta import Base, Session
 
from kallithea.model.permission import PermissionModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class DbManage(object):
 
    def __init__(self, dbconf, root, tests=False, SESSION=None, cli_args=None):
 
        self.dbname = dbconf.split('/')[-1]
 
        self.tests = tests
 
        self.root = root
 
        self.dburi = dbconf
 
        self.db_exists = False
 
        self.cli_args = cli_args or {}
 
        self.init_db(SESSION=SESSION)
 

	
 
    def _ask_ok(self, msg):
 
        """Invoke ask_ok unless the force_ask option provides the answer"""
 
        force_ask = self.cli_args.get('force_ask')
 
        if force_ask is not None:
 
            return force_ask
 
        from kallithea.lib.utils2 import ask_ok
 
        return ask_ok(msg)
 

	
 
    def init_db(self, SESSION=None):
 
        if SESSION:
 
            self.sa = SESSION
 
        else:
 
            # init new sessions
 
            engine = create_engine(self.dburi)
 
            init_model(engine)
 
            self.sa = Session()
 

	
 
    def create_tables(self, override=False):
 
        """
 
        Create a auth database
 
        """
 

	
 
        log.info("Any existing database is going to be destroyed")
 
        if self.tests:
 
            destroy = True
 
        else:
 
            destroy = self._ask_ok('Are you sure to destroy old database ? [y/n]')
 
        if not destroy:
 
            print('Nothing done.')
 
            sys.exit(0)
 
        if destroy:
 
            # drop and re-create old schemas
 

	
 
            url = sqlalchemy.engine.url.make_url(self.dburi)
 
            database = url.database
 

	
 
            # Some databases enforce foreign key constraints and Base.metadata.drop_all() doesn't work
 
            if url.drivername == 'mysql':
 
                url.database = None  # don't connect to the database (it might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.execute('DROP DATABASE IF EXISTS ' + database)
 
                    conn.execute('CREATE DATABASE ' + database)
 
            elif url.drivername == 'postgresql':
 
                from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
 
                url.database = 'postgres'  # connect to the system database (as the real one might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
                    conn.execute('DROP DATABASE IF EXISTS ' + database)
 
                    conn.execute('CREATE DATABASE ' + database)
 
            else:
 
                # known to work on SQLite - possibly not on other databases with strong referential integrity
 
                Base.metadata.drop_all()
 

	
 
        checkfirst = not override
 
        Base.metadata.create_all(checkfirst=checkfirst)
 

	
 
        # Create an Alembic configuration and generate the version table,
 
        # "stamping" it with the most recent Alembic migration revision, to
 
        # tell Alembic that all the schema upgrades are already in effect.
 
        alembic_cfg = alembic.config.Config()
 
        alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
 
        alembic_cfg.set_main_option('sqlalchemy.url', self.dburi)
kallithea/lib/pidlock.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from __future__ import print_function
 

	
 
import errno
 
import os
 
from multiprocessing.util import Finalize
 

	
 
from kallithea.lib.compat import kill
 

	
 

	
 
class LockHeld(Exception):
 
    pass
 

	
 

	
 
class DaemonLock(object):
 
    """daemon locking
 
    USAGE:
 
    try:
 
        l = DaemonLock('/path/tolockfile',desc='test lock')
 
        main()
 
        l.release()
 
    except LockHeld:
 
        sys.exit(1)
 
    """
 

	
 
    def __init__(self, file_, callbackfn=None,
 
                 desc='daemon lock', debug=False):
 
        self.pidfile = file_
 
        self.callbackfn = callbackfn
 
        self.desc = desc
 
        self.debug = debug
 
        self.held = False
 
        # run the lock automatically!
 
        self.lock()
 
        self._finalize = Finalize(self, DaemonLock._on_finalize,
 
                                  args=(self, debug), exitpriority=10)
 

	
 
    @staticmethod
 
    def _on_finalize(lock, debug):
 
        if lock.held:
 
            if debug:
 
                print('lock held finalizing and running lock.release()')
 
            lock.release()
 

	
 
    def lock(self):
 
        """
 
        locking function, if lock is present it
 
        will raise LockHeld exception
 
        """
 
        lockname = str(os.getpid())
 
        if self.debug:
 
            print('running lock')
 
        self.trylock()
 
        self.makelock(lockname, self.pidfile)
 
        return True
 

	
 
    def trylock(self):
 
        running_pid = False
 
        if self.debug:
 
            print('checking for already running process')
 
        try:
 
            with open(self.pidfile, 'r') as f:
 
                try:
 
                    running_pid = int(f.readline())
 
                except ValueError:
 
                    running_pid = -1
 

	
 
            if self.debug:
 
                print('lock file present running_pid: %s, '
 
                      'checking for execution' % (running_pid,))
 
            # Now we check the PID from lock file matches to the current
 
            # process PID
 
            if running_pid:
 
                try:
 
                    kill(running_pid, 0)
 
                except OSError as exc:
 
                    if exc.errno in (errno.ESRCH, errno.EPERM):
 
                        print ("Lock File is there but"
 
                               " the program is not running")
 
                        print("Removing lock file for the: %s" % running_pid)
 
                        self.release()
 
                    else:
 
                        raise
 
                else:
 
                    print("You already have an instance of the program running")
 
                    print("It is running as process %s" % running_pid)
 
                    raise LockHeld()
 

	
 
        except IOError as e:
 
            if e.errno != 2:
 
                raise
 

	
 
    def release(self):
 
        """releases the pid by removing the pidfile
 
        """
 
        if self.debug:
 
            print('trying to release the pidlock')
 

	
 
        if self.callbackfn:
kallithea/lib/utils2.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils2
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Some simple helper functions.
 
Note: all these functions should be independent of Kallithea classes, i.e.
 
models, controllers, etc.  to prevent import cycles.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import binascii
 
import datetime
 
import json
 
import os
 
import pwd
 
import re
 
import time
 
import urllib.parse
 

	
 
import urlobject
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, safe_bytes, safe_str  # re-export
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
 
    """
 
    if isinstance(obj, (str)):
 
        lst = obj.split(sep)
 
        if strip:
 
            lst = [v.strip() for v in lst]
 
        return lst
 
    elif isinstance(obj, (list, tuple)):
 
        return obj
 
    elif obj is None:
 
        return []
 
    else:
 
        return [obj]
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" according to given mode
 

	
 
    Available modes are::
 
        0 - Unix
 
        1 - Mac
 
        2 - DOS
 

	
 
    :param line: given line to convert
 
    :param mode: mode to convert to
 
    :rtype: str
 
    :return: converted line according to mode
 
    """
 
    if mode == 0:
 
        line = line.replace('\r\n', '\n')
 
        line = line.replace('\r', '\n')
 
    elif mode == 1:
 
        line = line.replace('\r\n', '\r')
 
        line = line.replace('\n', '\r')
 
    elif mode == 2:
 
        line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
    return line
 

	
 

	
 
def detect_mode(line, default):
 
    """
 
    Detects line break for given line, if line break couldn't be found
 
    given default value is returned
 

	
 
    :param line: str line
 
    :param default: default
 
    :rtype: int
 
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
 
    """
 
    if line.endswith('\r\n'):
 
        return 2
 
    elif line.endswith('\n'):
 
        return 0
 
    elif line.endswith('\r'):
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
"""
 
Utilities aimed to help achieve mostly basic tasks.
 
"""
 
from __future__ import division
 

	
 
import datetime
 
import os
 
import re
 
import time
 

	
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 

	
 
ALIASES = ['hg', 'git']
 

	
 

	
 
def get_scm(path, search_up=False, explicit_alias=None):
 
    """
 
    Returns one of alias from ``ALIASES`` (in order of precedence same as
 
    shortcuts given in ``ALIASES``) and top working dir path for the given
 
    argument. If no scm-specific directory is found or more than one scm is
 
    found at that directory, ``VCSError`` is raised.
 

	
 
    :param search_up: if set to ``True``, this function would try to
 
      move up to parent directory every time no scm is recognized for the
 
      currently checked path. Default: ``False``.
 
    :param explicit_alias: can be one of available backend aliases, when given
 
      it will return given explicit alias in repositories under more than one
 
      version control, if explicit_alias is different than found it will raise
 
      VCSError
 
    """
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %s is not a directory" % path)
 

	
 
    while True:
 
        found_scms = [(scm, path) for scm in get_scms_for_path(path)]
 
        if found_scms or not search_up:
 
            break
 
        newpath = abspath(path, '..')
 
        if newpath == path:
 
            break
 
        path = newpath
 

	
 
    if len(found_scms) > 1:
 
        for scm in found_scms:
 
            if scm[0] == explicit_alias:
 
                return scm
 
        raise VCSError('More than one [%s] scm found at given path %s'
 
                       % (', '.join((x[0] for x in found_scms)), path))
 

	
 
    if len(found_scms) == 0:
 
        raise VCSError('No scm found at given path %s' % path)
 

	
 
    return found_scms[0]
 

	
 

	
 
def get_scms_for_path(path):
 
    """
 
    Returns all scm's found at the given path. If no scm is recognized
 
    - empty list is returned.
 

	
 
    :param path: path to directory which should be checked. May be callable.
 

	
 
    :raises VCSError: if given ``path`` is not a directory
 
    """
 
    from kallithea.lib.vcs.backends import get_backend
 
    if hasattr(path, '__call__'):
 
        path = path()
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %r is not a directory" % path)
 

	
 
    result = []
 
    for key in ALIASES:
 
        # find .hg / .git
 
        dirname = os.path.join(path, '.' + key)
 
        if os.path.isdir(dirname):
 
            result.append(key)
 
            continue
 
        # find rm__.hg / rm__.git too - left overs from old method for deleting
 
        dirname = os.path.join(path, 'rm__.' + key)
 
        if os.path.isdir(dirname):
 
            return result
 
        # We still need to check if it's not bare repository as
 
        # bare repos don't have working directories
 
        try:
 
            get_backend(key)(path)
 
            result.append(key)
 
            continue
 
        except RepositoryError:
 
            # Wrong backend
 
            pass
 
        except VCSError:
 
            # No backend at all
 
            pass
 
    return result
 

	
 

	
 
def get_highlighted_code(name, code, type='terminal'):
 
    """
kallithea/lib/vcs/utils/progressbar.py
Show inline comments
 
# encoding: UTF-8
 

	
 
from __future__ import print_function
 

	
 
import datetime
 
import string
 
import sys
 

	
 
from kallithea.lib.vcs.utils.filesize import filesizeformat
 

	
 

	
 
class ProgressBarError(Exception):
 
    pass
 

	
 

	
 
class AlreadyFinishedError(ProgressBarError):
 
    pass
 

	
 

	
 
class ProgressBar(object):
 

	
 
    default_elements = ['percentage', 'bar', 'steps']
 

	
 
    def __init__(self, steps=100, stream=None, elements=None):
 
        self.step = 0
 
        self.steps = steps
 
        self.stream = stream or sys.stderr
 
        self.bar_char = '='
 
        self.width = 50
 
        self.separator = ' | '
 
        self.elements = elements or self.default_elements
 
        self.started = None
 
        self.finished = False
 
        self.steps_label = 'Step'
 
        self.time_label = 'Time'
 
        self.eta_label = 'ETA'
 
        self.speed_label = 'Speed'
 
        self.transfer_label = 'Transfer'
 

	
 
    def __str__(self):
 
        return self.get_line()
 

	
 
    def __iter__(self):
 
        start = self.step
 
        end = self.steps + 1
 
        for x in range(start, end):
 
            self.render(x)
 
            yield x
 

	
 
    def get_separator(self):
 
        return self.separator
 

	
 
    def get_bar_char(self):
 
        return self.bar_char
 

	
 
    def get_bar(self):
 
        char = self.get_bar_char()
 
        perc = self.get_percentage()
 
        length = int(self.width * perc / 100)
 
        bar = char * length
 
        bar = bar.ljust(self.width)
 
        return bar
 

	
 
    def get_elements(self):
 
        return self.elements
 

	
 
    def get_template(self):
 
        separator = self.get_separator()
 
        elements = self.get_elements()
 
        return string.Template(separator.join((('$%s' % e) for e in elements)))
 

	
 
    def get_total_time(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if not self.started:
 
            return datetime.timedelta()
 
        return current_time - self.started
 

	
 
    def get_rendered_total_time(self):
 
        delta = self.get_total_time()
 
        if not delta:
 
            ttime = '-'
 
        else:
 
            ttime = str(delta)
 
        return '%s %s' % (self.time_label, ttime)
 

	
 
    def get_eta(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if self.step == 0:
 
            return datetime.timedelta()
 
        total_seconds = self.get_total_time().total_seconds()
 
        eta_seconds = total_seconds * self.steps / self.step - total_seconds
 
        return datetime.timedelta(seconds=int(eta_seconds))
 

	
 
    def get_rendered_eta(self):
 
        eta = self.get_eta()
 
        if not eta:
 
            eta = '--:--:--'
 
        else:
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Test suite for vcs push/pull operations.
 

	
 
The tests need Git > 1.8.1.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import print_function
 

	
 
import json
 
import os
 
import re
 
import tempfile
 
import time
 
import urllib.request
 
from subprocess import PIPE, Popen
 
from tempfile import _RandomNameSequence
 

	
 
import pytest
 

	
 
from kallithea import CONFIG
 
from kallithea.lib.utils2 import ascii_bytes, safe_str
 
from kallithea.model.db import CacheInvalidation, Repository, Ui, User, UserIpMap, UserLog
 
from kallithea.model.meta import Session
 
from kallithea.model.ssh_key import SshKeyModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:4999'  # test host
 

	
 
fixture = Fixture()
 

	
 

	
 
# Parameterize different kinds of VCS testing - both the kind of VCS and the
 
# access method (HTTP/SSH)
 

	
 
# Mixin for using HTTP and SSH URLs
 
class HttpVcsTest(object):
 
    @staticmethod
 
    def repo_url_param(webserver, repo_name, **kwargs):
 
        return webserver.repo_url(repo_name, **kwargs)
 

	
 
class SshVcsTest(object):
 
    public_keys = {
 
        base.TEST_USER_REGULAR_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost',
 
        base.TEST_USER_ADMIN_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost',
 
    }
 

	
 
    @classmethod
 
    def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR):
 
        user = User.get_by_username(username)
 
        if user.ssh_keys:
 
            ssh_key = user.ssh_keys[0]
 
        else:
 
            sshkeymodel = SshKeyModel()
 
            ssh_key = sshkeymodel.create(user, 'test key', cls.public_keys[user.username])
 
            Session().commit()
 

	
 
        return cls._ssh_param(repo_name, user, ssh_key, client_ip)
 

	
 
# Mixins for using Mercurial and Git
 
class HgVcsTest(object):
 
    repo_type = 'hg'
 
    repo_name = base.HG_REPO
 

	
 
class GitVcsTest(object):
 
    repo_type = 'git'
 
    repo_name = base.GIT_REPO
 

	
 
# Combine mixins to give the combinations we want to parameterize tests with
 
class HgHttpVcsTest(HgVcsTest, HttpVcsTest):
 
    pass
 

	
 
class GitHttpVcsTest(GitVcsTest, HttpVcsTest):
 
    pass
 

	
 
class HgSshVcsTest(HgVcsTest, SshVcsTest):
 
    @staticmethod
 
    def _ssh_param(repo_name, user, ssh_key, client_ip):
 
        # Specify a custom ssh command on the command line
 
        return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % (
 
            client_ip,
 
            CONFIG['__file__'],
 
            user.user_id,
 
            ssh_key.user_ssh_key_id,
 
            repo_name)
 

	
 
class GitSshVcsTest(GitVcsTest, SshVcsTest):
 
    @staticmethod
 
    def _ssh_param(repo_name, user, ssh_key, client_ip):
 
        # Set a custom ssh command in the global environment
 
        os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % (
 
            client_ip,
 
            CONFIG['__file__'],
 
            user.user_id,
 
            ssh_key.user_ssh_key_id)
 
        return "ssh://someuser@somehost/%s""" % repo_name
 

	
 
parametrize_vcs_test = base.parametrize('vt', [
 
    HgHttpVcsTest,
 
    GitHttpVcsTest,
 
    HgSshVcsTest,
kallithea/tests/scripts/manual_test_concurrency.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.scripts.manual_test_concurrency
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Test suite for making push/pull operations
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import print_function
 

	
 
import logging
 
import os
 
import shutil
 
import sys
 
import tempfile
 
from os.path import dirname
 
from subprocess import PIPE, Popen
 

	
 
from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 

	
 
from kallithea.config.environment import load_environment
 
from kallithea.lib.auth import get_crypt_password
 
from kallithea.model import meta
 
from kallithea.model.base import init_model
 
from kallithea.model.db import Repository, Ui, User
 
from kallithea.tests.base import HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
 

	
 

	
 
rel_path = dirname(dirname(dirname(dirname(os.path.abspath(__file__)))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
load_environment(conf.global_conf, conf.local_conf)
 

	
 
USER = TEST_USER_ADMIN_LOGIN
 
PASS = TEST_USER_ADMIN_PASS
 
HOST = 'server.local'
 
METHOD = 'pull'
 
DEBUG = True
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        log.debug('Executing %s', command)
 
        if DEBUG:
 
            print(command)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            print(stdout, stderr)
 
        return stdout, stderr
 

	
 

	
 
def get_session():
 
    engine = engine_from_config(conf, 'sqlalchemy.')
 
    init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 

	
 
def create_test_user(force=True):
 
    print('creating test user')
 
    sa = get_session()
 

	
 
    user = sa.query(User).filter(User.username == USER).scalar()
 

	
 
    if force and user is not None:
 
        print('removing current user')
 
        for repo in sa.query(Repository).filter(Repository.user == user).all():
 
            sa.delete(repo)
 
        sa.delete(user)
 
        sa.commit()
 

	
 
    if user is None or force:
 
        print('creating new one')
 
        new_usr = User()
 
        new_usr.username = USER
 
        new_usr.password = get_crypt_password(PASS)
 
        new_usr.email = 'mail@example.com'
 
        new_usr.name = 'test'
 
        new_usr.lastname = 'lasttestname'
 
        new_usr.active = True
 
        new_usr.admin = True
 
        sa.add(new_usr)
 
        sa.commit()
 

	
 
    print('done')
 

	
 

	
 
def create_test_repo(force=True):
 
    print('creating test repo')
 
    from kallithea.model.repo import RepoModel
 
    sa = get_session()
 

	
 
    user = sa.query(User).filter(User.username == USER).scalar()
 
    if user is None:
 
        raise Exception('user not found')
 

	
kallithea/tests/scripts/manual_test_crawler.py
Show inline comments
 
#!/usr/bin/env python3
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.scripts.manual_test_crawler
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Test for crawling a project for memory usage
 
This should be runned just as regular script together
 
with a watch script that will show memory usage.
 

	
 
watch -n1 ./kallithea/tests/mem_watch
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import http.cookiejar
 
import os
 
import sys
 
import tempfile
 
import time
 
import urllib.parse
 
import urllib.request
 
from os.path import dirname
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.vcs.exceptions import RepositoryError
 

	
 

	
 
__here__ = os.path.abspath(__file__)
 
__root__ = dirname(dirname(dirname(__here__)))
 
sys.path.append(__root__)
 

	
 

	
 
PASES = 3
 
HOST = 'http://127.0.0.1'
 
PORT = 5000
 
BASE_URI = '%s:%s/' % (HOST, PORT)
 

	
 
if len(sys.argv) == 2:
 
    BASE_URI = sys.argv[1]
 

	
 
if not BASE_URI.endswith('/'):
 
    BASE_URI += '/'
 

	
 
print('Crawling @ %s' % BASE_URI)
 
BASE_URI += '%s'
 
PROJECT_PATH = os.path.join('/', 'home', 'username', 'repos')
 
PROJECTS = [
 
    # 'linux-magx-pbranch',
 
    'CPython',
 
    'kallithea',
 
]
 

	
 

	
 
cj = http.cookiejar.FileCookieJar(os.path.join(tempfile.gettempdir(), 'rc_test_cookie.txt'))
 
o = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
 
o.addheaders = [
 
    ('User-agent', 'kallithea-crawler'),
 
    ('Accept-Language', 'en - us, en;q = 0.5')
 
]
 

	
 
urllib.request.install_opener(o)
 

	
 

	
 
def _get_repo(proj):
 
    if isinstance(proj, str):
 
        repo = vcs.get_repo(os.path.join(PROJECT_PATH, proj))
 
        proj = proj
 
    else:
 
        repo = proj
 
        proj = repo.name
 

	
 
    return repo, proj
 

	
 

	
 
def test_changelog_walk(proj, pages=100):
 
    repo, proj = _get_repo(proj)
 

	
 
    total_time = 0
 
    for i in range(1, pages):
 

	
 
        page = '/'.join((proj, 'changelog',))
 

	
 
        full_uri = (BASE_URI % page) + '?' + urllib.parse.urlencode({'page': i})
 
        s = time.time()
 
        f = o.open(full_uri)
 

	
 
        assert f.url == full_uri, 'URL:%s does not match %s' % (f.url, full_uri)
 

	
 
        size = len(f.read())
 
        e = time.time() - s
 
        total_time += e
 
        print('visited %s size:%s req:%s ms' % (full_uri, size, e))
 

	
 
    print('total_time', total_time)
 
    print('average on req', total_time / float(pages))
 

	
 

	
 
def test_changeset_walk(proj, limit=None):
 
    repo, proj = _get_repo(proj)
 

	
 
    print('processing', os.path.join(PROJECT_PATH, proj))
 
    total_time = 0
 

	
 
    cnt = 0
 
    for i in repo:
 
        cnt += 1
 
        raw_cs = '/'.join((proj, 'changeset', i.raw_id))
 
        if limit and limit == cnt:
 
            break
scripts/docs-headings.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
"""
 
Consistent formatting of rst section titles
 
"""
 

	
 
from __future__ import print_function
 

	
 
import re
 
import subprocess
 

	
 

	
 
spaces = [
 
    (0, 1), # we assume this is a over-and-underlined header
 
    (2, 1),
 
    (1, 1),
 
    (1, 0),
 
    (1, 0),
 
    ]
 

	
 
# http://sphinx-doc.org/rest.html :
 
#   for the Python documentation, this convention is used which you may follow:
 
#   # with overline, for parts
 
#   * with overline, for chapters
 
#   =, for sections
 
#   -, for subsections
 
#   ^, for subsubsections
 
#   ", for paragraphs
 
pystyles = ['#', '*', '=', '-', '^', '"']
 

	
 
# match on a header line underlined with one of the valid characters
 
headermatch = re.compile(r'''\n*(.+)\n([][!"#$%&'()*+,./:;<=>?@\\^_`{|}~-])\2{2,}\n+''', flags=re.MULTILINE)
 

	
 

	
 
def main():
 
    filenames = subprocess.check_output(['hg', 'loc', 'set:**.rst+kallithea/i18n/how_to']).splitlines()
 
    for fn in filenames:
 
        fn = fn.decode()
 
        print('processing %s' % fn)
 
        s = open(fn).read()
 

	
 
        # find levels and their styles
 
        lastpos = 0
 
        styles = []
 
        for markup in headermatch.findall(s):
 
            style = markup[1]
 
            if style in styles:
 
                stylepos = styles.index(style)
 
                if stylepos > lastpos + 1:
 
                    print('bad style %r with level %s - was at %s' % (style, stylepos, lastpos))
 
            else:
 
                stylepos = len(styles)
 
                if stylepos > lastpos + 1:
 
                    print('bad new style %r - expected %r' % (style, styles[lastpos + 1]))
 
                else:
 
                    styles.append(style)
 
            lastpos = stylepos
 

	
 
        # remove superfluous spacing (may however be restored by header spacing)
 
        s = re.sub(r'''(\n\n)\n*''', r'\1', s, flags=re.MULTILINE)
 

	
 
        if styles:
 
            newstyles = pystyles[pystyles.index(styles[0]):]
 

	
 
            def subf(m):
 
                title, style = m.groups()
 
                level = styles.index(style)
 
                before, after = spaces[level]
 
                newstyle = newstyles[level]
 
                return '\n' * (before + 1) + title + '\n' + newstyle * len(title) + '\n' * (after + 1)
 
            s = headermatch.sub(subf, s)
 

	
 
        # remove superfluous spacing when headers are adjacent
 
        s = re.sub(r'''(\n.+\n([][!"#$%&'()*+,./:;<=>?@\\^_`{|}~-])\2{2,}\n\n\n)\n*''', r'\1', s, flags=re.MULTILINE)
 
        # fix trailing space and spacing before link sections
 
        s = s.strip() + '\n'
 
        s = re.sub(r'''\n+((?:\.\. _[^\n]*\n)+)$''', r'\n\n\n\1', s)
 

	
 
        open(fn, 'w').write(s)
 

	
 
    print(subprocess.check_output(['hg', 'diff'] + filenames))
 

	
 
if __name__ == '__main__':
 
    main()
scripts/generate-ini.py
Show inline comments
 
#!/usr/bin/env python3
 
"""
 
Based on kallithea/lib/paster_commands/template.ini.mako, generate development.ini
 
"""
 

	
 
from __future__ import print_function
 

	
 
import re
 

	
 
from kallithea.lib import inifile
 

	
 

	
 
# files to be generated from the mako template
 
ini_files = [
 
    ('development.ini',
 
        {
 
            '[server:main]': {
 
                'host': '0.0.0.0',
 
            },
 
            '[app:main]': {
 
                'debug': 'true',
 
                'app_instance_uuid': 'development-not-secret',
 
                'session.secret': 'development-not-secret',
 
            },
 
            '[logger_root]': {
 
                'handlers': 'console_color',
 
            },
 
            '[logger_routes]': {
 
                'level': 'DEBUG',
 
            },
 
            '[logger_beaker]': {
 
                'level': 'DEBUG',
 
            },
 
            '[logger_templates]': {
 
                'level': 'INFO',
 
            },
 
            '[logger_kallithea]': {
 
                'level': 'DEBUG',
 
            },
 
            '[logger_tg]': {
 
                'level': 'DEBUG',
 
            },
 
            '[logger_gearbox]': {
 
                'level': 'DEBUG',
 
            },
 
            '[logger_whoosh_indexer]': {
 
                'level': 'DEBUG',
 
            },
 
        },
 
    ),
 
]
 

	
 

	
 
def main():
 
    # make sure all mako lines starting with '#' (the '##' comments) are marked up as <text>
 
    makofile = inifile.template_file
 
    print('reading:', makofile)
 
    mako_org = open(makofile).read()
 
    mako_no_text_markup = re.sub(r'</?%text>', '', mako_org)
 
    mako_marked_up = re.sub(r'\n(##.*)', r'\n<%text>\1</%text>', mako_no_text_markup, flags=re.MULTILINE)
 
    if mako_marked_up != mako_org:
 
        print('writing:', makofile)
 
        open(makofile, 'w').write(mako_marked_up)
 

	
 
    # create ini files
 
    for fn, settings in ini_files:
 
        print('updating:', fn)
 
        inifile.create(fn, None, settings)
 

	
 

	
 
if __name__ == '__main__':
 
    main()
scripts/logformat.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from __future__ import print_function
 

	
 
import re
 
import sys
 

	
 

	
 
logre = r'''
 
(log\.(?:error|info|warning|debug)
 
[(][ \n]*
 
)
 
%s
 
(
 
[ \n]*[)]
 
)
 
'''
 

	
 

	
 
res = [
 
    # handle % () - keeping spaces around the old %
 
    (re.compile(logre % r'''("[^"]*"|'[^']*')   ([\n ]*) %  ([\n ]*) \( ( (?:[^()]|\n)* (?: \( (?:[^()]|\n)* \) (?:[^()]|\n)* )* ) \) ''', flags=re.MULTILINE | re.VERBOSE), r'\1\2,\3\4\5\6'),
 
    # handle % without () - keeping spaces around the old %
 
    (re.compile(logre % r'''("[^"]*"|'[^']*')   ([\n ]*) %  ([\n ]*)    ( (?:[^()]|\n)* (?: \( (?:[^()]|\n)* \) (?:[^()]|\n)* )* )    ''', flags=re.MULTILINE | re.VERBOSE), r'\1\2,\3\4\5\6'),
 
    # remove extra space if it is on next line
 
    (re.compile(logre % r'''("[^"]*"|'[^']*') , (\n [ ]) ([ ][\n ]*)    ( (?:[^()]|\n)* (?: \( (?:[^()]|\n)* \) (?:[^()]|\n)* )* )    ''', flags=re.MULTILINE | re.VERBOSE), r'\1\2,\3\4\5\6'),
 
    # remove extra space if it is on same line
 
    (re.compile(logre % r'''("[^"]*"|'[^']*') , [ ]+  () (   [\n ]+)    ( (?:[^()]|\n)* (?: \( (?:[^()]|\n)* \) (?:[^()]|\n)* )* )    ''', flags=re.MULTILINE | re.VERBOSE), r'\1\2,\3\4\5\6'),
 
    # remove trailing , and space
 
    (re.compile(logre % r'''("[^"]*"|'[^']*') ,       () (   [\n ]*)    ( (?:[^()]|\n)* (?: \( (?:[^()]|\n)* \) (?:[^()]|\n)* )* [^(), \n] ) [ ,]*''', flags=re.MULTILINE | re.VERBOSE), r'\1\2,\3\4\5\6'),
 
    ]
 

	
 

	
 
def rewrite(f):
 
    s = open(f).read()
 
    for r, t in res:
 
        s = r.sub(t, s)
 
    open(f, 'w').write(s)
 

	
 

	
 
if __name__ == '__main__':
 
    if len(sys.argv) < 2:
 
        print('Cleanup of superfluous % formatting of log statements.')
 
        print('Usage:')
 
        print('''  hg revert `hg loc '*.py'|grep -v logformat.py` && scripts/logformat.py `hg loc '*.py'` && hg diff''')
 
        raise SystemExit(1)
 

	
 
    for f in sys.argv[1:]:
 
        rewrite(f)
0 comments (0 inline, 0 general)