Changeset - f79b864dc192
[Not reviewed]
default
0 8 0
Mads Kiilerich - 6 years ago 2019-08-04 00:13:03
mads@kiilerich.com
Grafted from: 7d137ddb1309
flake8: fix W605 invalid escape sequence
8 files changed with 11 insertions and 11 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import os
 
import re
 
import shlex
 
import sys
 

	
 
import click
 

	
 
import kallithea
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.lib.vcs.backends.git.ssh import GitSshHandler
 
from kallithea.lib.vcs.backends.hg.ssh import MercurialSshHandler
 
from kallithea.model.ssh_key import SshKeyModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True, hidden=True)
 
@click.argument('user-id', type=click.INT, required=True)
 
@click.argument('key-id', type=click.INT, required=True)
 
def ssh_serve(user_id, key_id):
 
    """Serve SSH repository protocol access.
 

	
 
    The trusted command that is invoked from .ssh/authorized_keys to serve SSH
 
    protocol access. The access will be granted as the specified user ID, and
 
    logged as using the specified key ID.
 
    """
 
    ssh_enabled = kallithea.CONFIG.get('ssh_enabled', False)
 
    if not str2bool(ssh_enabled):
 
        sys.stderr.write("SSH access is disabled.\n")
 
        return sys.exit(1)
 

	
 
    ssh_locale = kallithea.CONFIG.get('ssh_locale')
 
    if ssh_locale:
 
        os.environ['LC_ALL'] = ssh_locale # trumps everything, including LANG, except LANGUAGE
 
        os.environ['LANGUAGE'] = ssh_locale # trumps LC_ALL for GNU gettext message handling
 

	
 
    ssh_original_command = os.environ.get('SSH_ORIGINAL_COMMAND', '')
 
    connection = re.search('^([\d\.]+)', os.environ.get('SSH_CONNECTION', ''))
 
    connection = re.search(r'^([0-9.]+)', os.environ.get('SSH_CONNECTION', ''))
 
    client_ip = connection.group(1) if connection else '0.0.0.0'
 
    log.debug('ssh-serve was invoked for SSH command %r from %s', ssh_original_command, client_ip)
 

	
 
    if not ssh_original_command:
 
        if os.environ.get('SSH_CONNECTION'):
 
            sys.stderr.write("'kallithea-cli ssh-serve' can only provide protocol access over SSH. Interactive SSH login for this user is disabled.\n")
 
        else:
 
            sys.stderr.write("'kallithea-cli ssh-serve' cannot be called directly. It must be specified as command in an SSH authorized_keys file.\n")
 
        return sys.exit(1)
 

	
 
    try:
 
        ssh_command_parts = shlex.split(ssh_original_command)
 
    except ValueError as e:
 
        sys.stderr.write('Error parsing SSH command %r: %s\n' % (ssh_original_command, e))
 
        sys.exit(1)
 
    for VcsHandler in [MercurialSshHandler, GitSshHandler]:
 
        vcs_handler = VcsHandler.make(ssh_command_parts)
 
        if vcs_handler is not None:
 
            vcs_handler.serve(user_id, key_id, client_ip)
 
            assert False # serve is written so it never will terminate
 

	
 
    sys.stderr.write("This account can only be used for repository access. SSH command %r is not supported.\n" % ssh_original_command)
 
    sys.exit(1)
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
def ssh_update_authorized_keys():
 
    """Update .ssh/authorized_keys file.
 

	
 
    The file is usually maintained automatically, but this command will also re-write it.
 
    """
 

	
 
    SshKeyModel().write_authorized_keys()
kallithea/config/routing.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Routes configuration
 

	
 
The more specific and detailed routes should be defined first so they
 
may take precedent over the more generic routes. For more information
 
refer to the routes manual at http://routes.groovie.org/docs/
 
"""
 

	
 
from routes import Mapper
 
from tg import request
 

	
 

	
 
# prefix for non repository related links needs to be prefixed with `/`
 
ADMIN_PREFIX = '/_admin'
 

	
 

	
 
def make_map(config):
 
    """Create, configure and return the routes Mapper"""
 
    rmap = Mapper(directory=config['paths']['controllers'],
 
                  always_scan=config['debug'])
 
    rmap.minimization = False
 
    rmap.explicit = False
 

	
 
    from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
 

	
 
    def check_repo(environ, match_dict):
 
        """
 
        Check for valid repository for proper 404 handling.
 
        Also, a bit of side effect modifying match_dict ...
 
        """
 
        if match_dict.get('f_path'):
 
            # fix for multiple initial slashes that causes errors
 
            match_dict['f_path'] = match_dict['f_path'].lstrip('/')
 

	
 
        return is_valid_repo(match_dict['repo_name'], config['base_path'])
 

	
 
    def check_group(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_group_name = match_dict.get('group_name')
 
        return is_valid_repo_group(repo_group_name, config['base_path'])
 

	
 
    def check_group_skip_path(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling, but skips
 
        verification of existing path
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_group_name = match_dict.get('group_name')
 
        return is_valid_repo_group(repo_group_name, config['base_path'],
 
                                   skip_path_check=True)
 

	
 
    def check_user_group(environ, match_dict):
 
        """
 
        check for valid user group for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        return True
 

	
 
    def check_int(environ, match_dict):
 
        return match_dict.get('id').isdigit()
 

	
 
    #==========================================================================
 
    # CUSTOM ROUTES HERE
 
    #==========================================================================
 

	
 
    # MAIN PAGE
 
    rmap.connect('home', '/', controller='home', action='index')
 
    rmap.connect('about', '/about', controller='home', action='about')
 
    rmap.redirect('/favicon.ico', '/images/favicon.ico')
 
    rmap.connect('repo_switcher_data', '/_repos', controller='home',
 
                 action='repo_switcher_data')
 
    rmap.connect('users_and_groups_data', '/_users_and_groups', controller='home',
 
                 action='users_and_groups_data')
 

	
 
    rmap.connect('rst_help',
 
                 "http://docutils.sourceforge.net/docs/user/rst/quickref.html",
 
                 _static=True)
 
    rmap.connect('kallithea_project_url', "https://kallithea-scm.org/", _static=True)
 
    rmap.connect('issues_url', 'https://bitbucket.org/conservancy/kallithea/issues', _static=True)
 

	
 
    # ADMIN REPOSITORY ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/repos') as m:
 
        m.connect("repos", "/repos",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("repos", "/repos",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_repo", "/create_repository",
 
                  action="create_repository", conditions=dict(method=["GET"]))
 
        m.connect("update_repo", "/repos/{repo_name:.*?}",
 
                  action="update", conditions=dict(method=["POST"],
 
                  function=check_repo))
 
        m.connect("delete_repo", "/repos/{repo_name:.*?}/delete",
 
                  action="delete", conditions=dict(method=["POST"]))
 

	
 
    # ADMIN REPOSITORY GROUPS ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/repo_groups') as m:
 
        m.connect("repos_groups", "/repo_groups",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("repos_groups", "/repo_groups",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_repos_group", "/repo_groups/new",
 
                  action="new", conditions=dict(method=["GET"]))
 
        m.connect("update_repos_group", "/repo_groups/{group_name:.*?}",
 
                  action="update", conditions=dict(method=["POST"],
 
                                                   function=check_group))
 

	
 
        m.connect("repos_group", "/repo_groups/{group_name:.*?}",
 
                  action="show", conditions=dict(method=["GET"],
 
                                                 function=check_group))
 

	
 
        # EXTRAS REPO GROUP ROUTES
 
        m.connect("edit_repo_group", "/repo_groups/{group_name:.*?}/edit",
 
                  action="edit",
 
                  conditions=dict(method=["GET"], function=check_group))
 

	
 
        m.connect("edit_repo_group_advanced", "/repo_groups/{group_name:.*?}/edit/advanced",
 
                  action="edit_repo_group_advanced",
 
                  conditions=dict(method=["GET"], function=check_group))
 

	
 
        m.connect("edit_repo_group_perms", "/repo_groups/{group_name:.*?}/edit/permissions",
 
                  action="edit_repo_group_perms",
 
                  conditions=dict(method=["GET"], function=check_group))
 
        m.connect("edit_repo_group_perms_update", "/repo_groups/{group_name:.*?}/edit/permissions",
 
                  action="update_perms",
 
                  conditions=dict(method=["POST"], function=check_group))
 
        m.connect("edit_repo_group_perms_delete", "/repo_groups/{group_name:.*?}/edit/permissions/delete",
 
                  action="delete_perms",
 
                  conditions=dict(method=["POST"], function=check_group))
 

	
 
        m.connect("delete_repo_group", "/repo_groups/{group_name:.*?}/delete",
 
                  action="delete", conditions=dict(method=["POST"],
 
                                                   function=check_group_skip_path))
 

	
 
    # ADMIN USER ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/users') as m:
 
        m.connect("new_user", "/users/new",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("users", "/users",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("formatted_users", "/users.{format}",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_user", "/users/new",
 
                  action="new", conditions=dict(method=["GET"]))
 
        m.connect("update_user", "/users/{id}",
 
                  action="update", conditions=dict(method=["POST"]))
 
        m.connect("delete_user", "/users/{id}/delete",
 
                  action="delete", conditions=dict(method=["POST"]))
 
        m.connect("edit_user", "/users/{id}/edit",
 
                  action="edit", conditions=dict(method=["GET"]))
 

	
 
        # EXTRAS USER ROUTES
 
        m.connect("edit_user_advanced", "/users/{id}/edit/advanced",
 
                  action="edit_advanced", conditions=dict(method=["GET"]))
 

	
 
        m.connect("edit_user_api_keys", "/users/{id}/edit/api_keys",
 
                  action="edit_api_keys", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_api_keys_update", "/users/{id}/edit/api_keys",
 
                  action="add_api_key", conditions=dict(method=["POST"]))
 
        m.connect("edit_user_api_keys_delete", "/users/{id}/edit/api_keys/delete",
 
                  action="delete_api_key", conditions=dict(method=["POST"]))
 

	
 
        m.connect("edit_user_ssh_keys", "/users/{id}/edit/ssh_keys",
 
                  action="edit_ssh_keys", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_ssh_keys", "/users/{id}/edit/ssh_keys",
 
                  action="ssh_keys_add", conditions=dict(method=["POST"]))
 
        m.connect("edit_user_ssh_keys_delete", "/users/{id}/edit/ssh_keys/delete",
 
                  action="ssh_keys_delete", conditions=dict(method=["POST"]))
 

	
 
        m.connect("edit_user_perms", "/users/{id}/edit/permissions",
 
                  action="edit_perms", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_perms_update", "/users/{id}/edit/permissions",
 
                  action="update_perms", conditions=dict(method=["POST"]))
 

	
 
        m.connect("edit_user_emails", "/users/{id}/edit/emails",
 
                  action="edit_emails", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_emails_update", "/users/{id}/edit/emails",
 
                  action="add_email", conditions=dict(method=["POST"]))
 
        m.connect("edit_user_emails_delete", "/users/{id}/edit/emails/delete",
 
                  action="delete_email", conditions=dict(method=["POST"]))
 

	
 
        m.connect("edit_user_ips", "/users/{id}/edit/ips",
 
                  action="edit_ips", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_ips_update", "/users/{id}/edit/ips",
 
                  action="add_ip", conditions=dict(method=["POST"]))
 
        m.connect("edit_user_ips_delete", "/users/{id}/edit/ips/delete",
 
                  action="delete_ip", conditions=dict(method=["POST"]))
 

	
 
    # ADMIN USER GROUPS REST ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/user_groups') as m:
 
        m.connect("users_groups", "/user_groups",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("users_groups", "/user_groups",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_users_group", "/user_groups/new",
 
                  action="new", conditions=dict(method=["GET"]))
 
        m.connect("update_users_group", "/user_groups/{id}",
 
                  action="update", conditions=dict(method=["POST"]))
 
        m.connect("delete_users_group", "/user_groups/{id}/delete",
 
                  action="delete", conditions=dict(method=["POST"]))
 
        m.connect("edit_users_group", "/user_groups/{id}/edit",
 
                  action="edit", conditions=dict(method=["GET"]),
 
                  function=check_user_group)
 

	
 
        # EXTRAS USER GROUP ROUTES
 
        m.connect("edit_user_group_default_perms", "/user_groups/{id}/edit/default_perms",
 
                  action="edit_default_perms", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_group_default_perms_update", "/user_groups/{id}/edit/default_perms",
 
                  action="update_default_perms", conditions=dict(method=["POST"]))
 

	
 
        m.connect("edit_user_group_perms", "/user_groups/{id}/edit/perms",
 
                  action="edit_perms", conditions=dict(method=["GET"]))
 
        m.connect("edit_user_group_perms_update", "/user_groups/{id}/edit/perms",
 
                  action="update_perms", conditions=dict(method=["POST"]))
 
        m.connect("edit_user_group_perms_delete", "/user_groups/{id}/edit/perms/delete",
 
                  action="delete_perms", conditions=dict(method=["POST"]))
 

	
 
        m.connect("edit_user_group_advanced", "/user_groups/{id}/edit/advanced",
 
                  action="edit_advanced", conditions=dict(method=["GET"]))
 

	
 
        m.connect("edit_user_group_members", "/user_groups/{id}/edit/members",
 
                  action="edit_members", conditions=dict(method=["GET"]))
 

	
 
    # ADMIN PERMISSIONS ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/permissions') as m:
 
        m.connect("admin_permissions", "/permissions",
 
                  action="permission_globals", conditions=dict(method=["POST"]))
 
        m.connect("admin_permissions", "/permissions",
 
                  action="permission_globals", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_permissions_ips", "/permissions/ips",
 
                  action="permission_ips", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_permissions_perms", "/permissions/perms",
 
                  action="permission_perms", conditions=dict(method=["GET"]))
 

	
 
    # ADMIN DEFAULTS ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/defaults') as m:
 
        m.connect('defaults', 'defaults',
 
                  action="index")
 
        m.connect('defaults_update', 'defaults/{id}/update',
 
                  action="update", conditions=dict(method=["POST"]))
 

	
 
    # ADMIN AUTH SETTINGS
 
    rmap.connect('auth_settings', '%s/auth' % ADMIN_PREFIX,
 
                 controller='admin/auth_settings', action='auth_settings',
 
                 conditions=dict(method=["POST"]))
 
    rmap.connect('auth_home', '%s/auth' % ADMIN_PREFIX,
 
                 controller='admin/auth_settings')
 

	
 
    # ADMIN SETTINGS ROUTES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/settings') as m:
 
        m.connect("admin_settings", "/settings",
 
                  action="settings_vcs", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings", "/settings",
 
                  action="settings_vcs", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_settings_mapping", "/settings/mapping",
 
                  action="settings_mapping", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_mapping", "/settings/mapping",
 
                  action="settings_mapping", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_settings_global", "/settings/global",
 
                  action="settings_global", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_global", "/settings/global",
 
                  action="settings_global", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_settings_visual", "/settings/visual",
 
                  action="settings_visual", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_visual", "/settings/visual",
 
                  action="settings_visual", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_settings_email", "/settings/email",
 
                  action="settings_email", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_email", "/settings/email",
 
                  action="settings_email", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_settings_hooks", "/settings/hooks",
 
                  action="settings_hooks", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_hooks_delete", "/settings/hooks/delete",
 
                  action="settings_hooks", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_hooks", "/settings/hooks",
 
                  action="settings_hooks", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_settings_search", "/settings/search",
 
                  action="settings_search", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_search", "/settings/search",
 
                  action="settings_search", conditions=dict(method=["GET"]))
 

	
 
        m.connect("admin_settings_system", "/settings/system",
 
                  action="settings_system", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings_system", "/settings/system",
 
                  action="settings_system", conditions=dict(method=["GET"]))
 
        m.connect("admin_settings_system_update", "/settings/system/updates",
 
                  action="settings_system_update", conditions=dict(method=["GET"]))
 

	
 
    # ADMIN MY ACCOUNT
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/my_account') as m:
 

	
 
        m.connect("my_account", "/my_account",
 
                  action="my_account", conditions=dict(method=["GET"]))
 
        m.connect("my_account", "/my_account",
 
                  action="my_account", conditions=dict(method=["POST"]))
 

	
 
        m.connect("my_account_password", "/my_account/password",
 
                  action="my_account_password", conditions=dict(method=["GET"]))
 
        m.connect("my_account_password", "/my_account/password",
 
                  action="my_account_password", conditions=dict(method=["POST"]))
 

	
 
        m.connect("my_account_repos", "/my_account/repos",
 
                  action="my_account_repos", conditions=dict(method=["GET"]))
 

	
 
        m.connect("my_account_watched", "/my_account/watched",
 
                  action="my_account_watched", conditions=dict(method=["GET"]))
 

	
 
        m.connect("my_account_perms", "/my_account/perms",
 
                  action="my_account_perms", conditions=dict(method=["GET"]))
 

	
 
        m.connect("my_account_emails", "/my_account/emails",
 
                  action="my_account_emails", conditions=dict(method=["GET"]))
 
        m.connect("my_account_emails", "/my_account/emails",
 
                  action="my_account_emails_add", conditions=dict(method=["POST"]))
 
        m.connect("my_account_emails_delete", "/my_account/emails/delete",
 
                  action="my_account_emails_delete", conditions=dict(method=["POST"]))
 

	
 
        m.connect("my_account_api_keys", "/my_account/api_keys",
 
                  action="my_account_api_keys", conditions=dict(method=["GET"]))
 
        m.connect("my_account_api_keys", "/my_account/api_keys",
 
                  action="my_account_api_keys_add", conditions=dict(method=["POST"]))
 
        m.connect("my_account_api_keys_delete", "/my_account/api_keys/delete",
 
                  action="my_account_api_keys_delete", conditions=dict(method=["POST"]))
 

	
 
        m.connect("my_account_ssh_keys", "/my_account/ssh_keys",
 
                  action="my_account_ssh_keys", conditions=dict(method=["GET"]))
 
        m.connect("my_account_ssh_keys", "/my_account/ssh_keys",
 
                  action="my_account_ssh_keys_add", conditions=dict(method=["POST"]))
 
        m.connect("my_account_ssh_keys_delete", "/my_account/ssh_keys/delete",
 
                  action="my_account_ssh_keys_delete", conditions=dict(method=["POST"]))
 

	
 
    # ADMIN GIST
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/gists') as m:
 
        m.connect("gists", "/gists",
 
                  action="create", conditions=dict(method=["POST"]))
 
        m.connect("gists", "/gists",
 
                  action="index", conditions=dict(method=["GET"]))
 
        m.connect("new_gist", "/gists/new",
 
                  action="new", conditions=dict(method=["GET"]))
 

	
 
        m.connect("gist_delete", "/gists/{gist_id}/delete",
 
                  action="delete", conditions=dict(method=["POST"]))
 
        m.connect("edit_gist", "/gists/{gist_id}/edit",
 
                  action="edit", conditions=dict(method=["GET", "POST"]))
 
        m.connect("edit_gist_check_revision", "/gists/{gist_id}/edit/check_revision",
 
                  action="check_revision", conditions=dict(method=["POST"]))
 

	
 
        m.connect("gist", "/gists/{gist_id}",
 
                  action="show", conditions=dict(method=["GET"]))
 
        m.connect("gist_rev", "/gists/{gist_id}/{revision}",
 
                  revision="tip",
 
                  action="show", conditions=dict(method=["GET"]))
 
        m.connect("formatted_gist", "/gists/{gist_id}/{revision}/{format}",
 
                  revision="tip",
 
                  action="show", conditions=dict(method=["GET"]))
 
        m.connect("formatted_gist_file", "/gists/{gist_id}/{revision}/{format}/{f_path:.*}",
 
                  revision='tip',
 
                  action="show", conditions=dict(method=["GET"]))
 

	
 
    # ADMIN MAIN PAGES
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX,
 
                        controller='admin/admin') as m:
 
        m.connect('admin_home', '', action='index')
 
        m.connect('admin_add_repo', '/add_repo/{new_repo:[a-z0-9\. _-]*}',
 
        m.connect('admin_add_repo', '/add_repo/{new_repo:[a-z0-9. _-]*}',
 
                  action='add_repo')
 
    #==========================================================================
 
    # API V2
 
    #==========================================================================
 
    with rmap.submapper(path_prefix=ADMIN_PREFIX, controller='api/api',
 
                        action='_dispatch') as m:
 
        m.connect('api', '/api')
 

	
 
    # USER JOURNAL
 
    rmap.connect('journal', '%s/journal' % ADMIN_PREFIX,
 
                 controller='journal', action='index')
 
    rmap.connect('journal_rss', '%s/journal/rss' % ADMIN_PREFIX,
 
                 controller='journal', action='journal_rss')
 
    rmap.connect('journal_atom', '%s/journal/atom' % ADMIN_PREFIX,
 
                 controller='journal', action='journal_atom')
 

	
 
    rmap.connect('public_journal', '%s/public_journal' % ADMIN_PREFIX,
 
                 controller='journal', action="public_journal")
 

	
 
    rmap.connect('public_journal_rss', '%s/public_journal/rss' % ADMIN_PREFIX,
 
                 controller='journal', action="public_journal_rss")
 

	
 
    rmap.connect('public_journal_rss_old', '%s/public_journal_rss' % ADMIN_PREFIX,
 
                 controller='journal', action="public_journal_rss")
 

	
 
    rmap.connect('public_journal_atom',
 
                 '%s/public_journal/atom' % ADMIN_PREFIX, controller='journal',
 
                 action="public_journal_atom")
 

	
 
    rmap.connect('public_journal_atom_old',
 
                 '%s/public_journal_atom' % ADMIN_PREFIX, controller='journal',
 
                 action="public_journal_atom")
 

	
 
    rmap.connect('toggle_following', '%s/toggle_following' % ADMIN_PREFIX,
 
                 controller='journal', action='toggle_following',
 
                 conditions=dict(method=["POST"]))
 

	
 
    # SEARCH
 
    rmap.connect('search', '%s/search' % ADMIN_PREFIX, controller='search',)
 
    rmap.connect('search_repo_admin', '%s/search/{repo_name:.*}' % ADMIN_PREFIX,
 
                 controller='search',
 
                 conditions=dict(function=check_repo))
 
    rmap.connect('search_repo', '/{repo_name:.*?}/search',
 
                 controller='search',
 
                 conditions=dict(function=check_repo),
 
                 )
 

	
 
    # LOGIN/LOGOUT/REGISTER/SIGN IN
 
    rmap.connect('session_csrf_secret_token', '%s/session_csrf_secret_token' % ADMIN_PREFIX, controller='login', action='session_csrf_secret_token')
 
    rmap.connect('login_home', '%s/login' % ADMIN_PREFIX, controller='login')
 
    rmap.connect('logout_home', '%s/logout' % ADMIN_PREFIX, controller='login',
 
                 action='logout')
 

	
 
    rmap.connect('register', '%s/register' % ADMIN_PREFIX, controller='login',
 
                 action='register')
 

	
 
    rmap.connect('reset_password', '%s/password_reset' % ADMIN_PREFIX,
 
                 controller='login', action='password_reset')
 

	
 
    rmap.connect('reset_password_confirmation',
 
                 '%s/password_reset_confirmation' % ADMIN_PREFIX,
 
                 controller='login', action='password_reset_confirmation')
 

	
 
    # FEEDS
 
    rmap.connect('rss_feed_home', '/{repo_name:.*?}/feed/rss',
 
                controller='feed', action='rss',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('atom_feed_home', '/{repo_name:.*?}/feed/atom',
 
                controller='feed', action='atom',
 
                conditions=dict(function=check_repo))
 

	
 
    #==========================================================================
 
    # REPOSITORY ROUTES
 
    #==========================================================================
 
    rmap.connect('repo_creating_home', '/{repo_name:.*?}/repo_creating',
 
                controller='admin/repos', action='repo_creating')
 
    rmap.connect('repo_check_home', '/{repo_name:.*?}/crepo_check',
 
                controller='admin/repos', action='repo_check')
 

	
 
    rmap.connect('summary_home', '/{repo_name:.*?}',
 
                controller='summary',
 
                conditions=dict(function=check_repo))
 

	
 
    # must be here for proper group/repo catching
 
    rmap.connect('repos_group_home', '/{group_name:.*}',
 
                controller='admin/repo_groups', action="show_by_name",
 
                conditions=dict(function=check_group))
 
    rmap.connect('repo_stats_home', '/{repo_name:.*?}/statistics',
 
                controller='summary', action='statistics',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('repo_size', '/{repo_name:.*?}/repo_size',
 
                controller='summary', action='repo_size',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('repo_refs_data', '/{repo_name:.*?}/refs-data',
 
                 controller='home', action='repo_refs_data')
 

	
 
    rmap.connect('changeset_home', '/{repo_name:.*?}/changeset/{revision:.*}',
 
                controller='changeset', revision='tip',
 
                conditions=dict(function=check_repo))
 
    rmap.connect('changeset_children', '/{repo_name:.*?}/changeset_children/{revision}',
 
                controller='changeset', revision='tip', action="changeset_children",
 
                conditions=dict(function=check_repo))
 
    rmap.connect('changeset_parents', '/{repo_name:.*?}/changeset_parents/{revision}',
 
                controller='changeset', revision='tip', action="changeset_parents",
 
                conditions=dict(function=check_repo))
 

	
 
    # repo edit options
 
    rmap.connect("edit_repo", "/{repo_name:.*?}/settings",
 
                 controller='admin/repos', action="edit",
 
                 conditions=dict(method=["GET"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_perms", "/{repo_name:.*?}/settings/permissions",
 
                 controller='admin/repos', action="edit_permissions",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect("edit_repo_perms_update", "/{repo_name:.*?}/settings/permissions",
 
                 controller='admin/repos', action="edit_permissions_update",
 
                 conditions=dict(method=["POST"], function=check_repo))
 
    rmap.connect("edit_repo_perms_revoke", "/{repo_name:.*?}/settings/permissions/delete",
 
                 controller='admin/repos', action="edit_permissions_revoke",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_fields", "/{repo_name:.*?}/settings/fields",
 
                 controller='admin/repos', action="edit_fields",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect('create_repo_fields', "/{repo_name:.*?}/settings/fields/new",
 
                 controller='admin/repos', action="create_repo_field",
 
                 conditions=dict(method=["POST"], function=check_repo))
 
    rmap.connect('delete_repo_fields', "/{repo_name:.*?}/settings/fields/{field_id}/delete",
 
                 controller='admin/repos', action="delete_repo_field",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_advanced", "/{repo_name:.*?}/settings/advanced",
 
                 controller='admin/repos', action="edit_advanced",
 
                 conditions=dict(method=["GET"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_advanced_journal", "/{repo_name:.*?}/settings/advanced/journal",
 
                 controller='admin/repos', action="edit_advanced_journal",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_advanced_fork", "/{repo_name:.*?}/settings/advanced/fork",
 
                 controller='admin/repos', action="edit_advanced_fork",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_caches", "/{repo_name:.*?}/settings/caches",
 
                 controller='admin/repos', action="edit_caches",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect("update_repo_caches", "/{repo_name:.*?}/settings/caches",
 
                 controller='admin/repos', action="edit_caches",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_remote", "/{repo_name:.*?}/settings/remote",
 
                 controller='admin/repos', action="edit_remote",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect("edit_repo_remote_update", "/{repo_name:.*?}/settings/remote",
 
                 controller='admin/repos', action="edit_remote",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_statistics", "/{repo_name:.*?}/settings/statistics",
 
                 controller='admin/repos', action="edit_statistics",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect("edit_repo_statistics_update", "/{repo_name:.*?}/settings/statistics",
 
                 controller='admin/repos', action="edit_statistics",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    # still working url for backward compat.
 
    rmap.connect('raw_changeset_home_depraced',
 
                 '/{repo_name:.*?}/raw-changeset/{revision}',
 
                 controller='changeset', action='changeset_raw',
 
                 revision='tip', conditions=dict(function=check_repo))
 

	
 
    ## new URLs
 
    rmap.connect('changeset_raw_home',
 
                 '/{repo_name:.*?}/changeset-diff/{revision}',
 
                 controller='changeset', action='changeset_raw',
 
                 revision='tip', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('changeset_patch_home',
 
                 '/{repo_name:.*?}/changeset-patch/{revision}',
 
                 controller='changeset', action='changeset_patch',
 
                 revision='tip', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('changeset_download_home',
 
                 '/{repo_name:.*?}/changeset-download/{revision}',
 
                 controller='changeset', action='changeset_download',
 
                 revision='tip', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('changeset_comment',
 
                 '/{repo_name:.*?}/changeset-comment/{revision}',
 
                controller='changeset', revision='tip', action='comment',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('changeset_comment_delete',
 
                 '/{repo_name:.*?}/changeset-comment/{comment_id}/delete',
 
                controller='changeset', action='delete_comment',
 
                conditions=dict(function=check_repo, method=["POST"]))
 

	
 
    rmap.connect('changeset_info', '/changeset_info/{repo_name:.*?}/{revision}',
 
                 controller='changeset', action='changeset_info')
 

	
 
    rmap.connect('compare_home',
 
                 '/{repo_name:.*?}/compare',
 
                 controller='compare', action='index',
 
                 conditions=dict(function=check_repo))
 

	
 
    rmap.connect('compare_url',
 
                 '/{repo_name:.*?}/compare/{org_ref_type}@{org_ref_name:.*?}...{other_ref_type}@{other_ref_name:.*?}',
 
                 controller='compare', action='compare',
 
                 conditions=dict(function=check_repo),
 
                 requirements=dict(
 
                            org_ref_type='(branch|book|tag|rev|__other_ref_type__)',
 
                            other_ref_type='(branch|book|tag|rev|__org_ref_type__)')
 
                 )
 

	
 
    rmap.connect('pullrequest_home',
 
                 '/{repo_name:.*?}/pull-request/new', controller='pullrequests',
 
                 action='index', conditions=dict(function=check_repo,
 
                                                 method=["GET"]))
 

	
 
    rmap.connect('pullrequest_repo_info',
 
                 '/{repo_name:.*?}/pull-request-repo-info',
 
                 controller='pullrequests', action='repo_info',
 
                 conditions=dict(function=check_repo, method=["GET"]))
 

	
 
    rmap.connect('pullrequest',
 
                 '/{repo_name:.*?}/pull-request/new', controller='pullrequests',
 
                 action='create', conditions=dict(function=check_repo,
 
                                                  method=["POST"]))
 

	
 
    rmap.connect('pullrequest_show',
 
                 '/{repo_name:.*?}/pull-request/{pull_request_id:\\d+}{extra:(/.*)?}', extra='',
 
                 controller='pullrequests',
 
                 action='show', conditions=dict(function=check_repo,
 
                                                method=["GET"]))
 
    rmap.connect('pullrequest_post',
 
                 '/{repo_name:.*?}/pull-request/{pull_request_id}',
 
                 controller='pullrequests',
 
                 action='post', conditions=dict(function=check_repo,
 
                                                method=["POST"]))
 
    rmap.connect('pullrequest_delete',
 
                 '/{repo_name:.*?}/pull-request/{pull_request_id}/delete',
 
                 controller='pullrequests',
 
                 action='delete', conditions=dict(function=check_repo,
 
                                                  method=["POST"]))
 

	
 
    rmap.connect('pullrequest_show_all',
 
                 '/{repo_name:.*?}/pull-request',
 
                 controller='pullrequests',
 
                 action='show_all', conditions=dict(function=check_repo,
 
                                                method=["GET"]))
 

	
 
    rmap.connect('my_pullrequests',
 
                 '/my_pullrequests',
 
                 controller='pullrequests',
 
                 action='show_my', conditions=dict(method=["GET"]))
 

	
 
    rmap.connect('pullrequest_comment',
 
                 '/{repo_name:.*?}/pull-request-comment/{pull_request_id}',
 
                 controller='pullrequests',
 
                 action='comment', conditions=dict(function=check_repo,
 
                                                method=["POST"]))
 

	
 
    rmap.connect('pullrequest_comment_delete',
 
                 '/{repo_name:.*?}/pull-request-comment/{comment_id}/delete',
 
                controller='pullrequests', action='delete_comment',
 
                conditions=dict(function=check_repo, method=["POST"]))
 

	
 
    rmap.connect('summary_home_summary', '/{repo_name:.*?}/summary',
 
                controller='summary', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('changelog_home', '/{repo_name:.*?}/changelog',
 
                controller='changelog', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('changelog_file_home', '/{repo_name:.*?}/changelog/{revision}/{f_path:.*}',
 
                controller='changelog', f_path=None,
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('changelog_details', '/{repo_name:.*?}/changelog_details/{cs}',
 
                controller='changelog', action='changelog_details',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_home', '/{repo_name:.*?}/files/{revision}/{f_path:.*}',
 
                controller='files', revision='tip', f_path='',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_home_nopath', '/{repo_name:.*?}/files/{revision}',
 
                controller='files', revision='tip', f_path='',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_history_home',
 
                 '/{repo_name:.*?}/history/{revision}/{f_path:.*}',
 
                 controller='files', action='history', revision='tip', f_path='',
 
                 conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_authors_home',
 
                 '/{repo_name:.*?}/authors/{revision}/{f_path:.*}',
 
                 controller='files', action='authors', revision='tip', f_path='',
 
                 conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_diff_home', '/{repo_name:.*?}/diff/{f_path:.*}',
 
                controller='files', action='diff', revision='tip', f_path='',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_diff_2way_home', '/{repo_name:.*?}/diff-2way/{f_path:.+}',
 
                controller='files', action='diff_2way', revision='tip', f_path='',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_rawfile_home',
 
                 '/{repo_name:.*?}/rawfile/{revision}/{f_path:.*}',
 
                 controller='files', action='rawfile', revision='tip',
 
                 f_path='', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_raw_home',
 
                 '/{repo_name:.*?}/raw/{revision}/{f_path:.*}',
 
                 controller='files', action='raw', revision='tip', f_path='',
 
                 conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_annotate_home',
 
                 '/{repo_name:.*?}/annotate/{revision}/{f_path:.*}',
 
                 controller='files', action='index', revision='tip',
 
                 f_path='', annotate=True, conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_edit_home',
 
                 '/{repo_name:.*?}/edit/{revision}/{f_path:.*}',
 
                 controller='files', action='edit', revision='tip',
 
                 f_path='', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_add_home',
 
                 '/{repo_name:.*?}/add/{revision}/{f_path:.*}',
 
                 controller='files', action='add', revision='tip',
 
                 f_path='', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_delete_home',
 
                 '/{repo_name:.*?}/delete/{revision}/{f_path:.*}',
 
                 controller='files', action='delete', revision='tip',
 
                 f_path='', conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_archive_home', '/{repo_name:.*?}/archive/{fname}',
 
                controller='files', action='archivefile',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('files_nodelist_home',
 
                 '/{repo_name:.*?}/nodelist/{revision}/{f_path:.*}',
 
                controller='files', action='nodelist',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('repo_fork_create_home', '/{repo_name:.*?}/fork',
 
                controller='forks', action='fork_create',
 
                conditions=dict(function=check_repo, method=["POST"]))
 

	
 
    rmap.connect('repo_fork_home', '/{repo_name:.*?}/fork',
 
                controller='forks', action='fork',
 
                conditions=dict(function=check_repo))
 

	
 
    rmap.connect('repo_forks_home', '/{repo_name:.*?}/forks',
 
                 controller='forks', action='forks',
 
                 conditions=dict(function=check_repo))
 

	
 
    rmap.connect('repo_followers_home', '/{repo_name:.*?}/followers',
 
                 controller='followers', action='followers',
 
                 conditions=dict(function=check_repo))
 

	
 
    return rmap
 

	
 

	
 
class UrlGenerator(object):
 
    """Emulate pylons.url in providing a wrapper around routes.url
 

	
 
    This code was added during migration from Pylons to Turbogears2. Pylons
 
    already provided a wrapper like this, but Turbogears2 does not.
 

	
 
    When the routing of Kallithea is changed to use less Routes and more
 
    Turbogears2-style routing, this class may disappear or change.
 

	
 
    url() (the __call__ method) returns the URL based on a route name and
 
    arguments.
 
    url.current() returns the URL of the current page with arguments applied.
 

	
 
    Refer to documentation of Routes for details:
 
    https://routes.readthedocs.io/en/latest/generating.html#generation
 
    """
 
    def __call__(self, *args, **kwargs):
 
        return request.environ['routes.url'](*args, **kwargs)
 

	
 
    def current(self, *args, **kwargs):
 
        return request.environ['routes.url'].current(*args, **kwargs)
 

	
 

	
 
url = UrlGenerator()
kallithea/lib/auth_modules/auth_pam.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_pam
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication library for PAM
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Apr 09, 2013
 
:author: Alexey Larikov
 
"""
 

	
 
import grp
 
import logging
 
import pwd
 
import re
 
import socket
 
import threading
 
import time
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import formatted_json, hybrid_property
 

	
 

	
 
try:
 
    from pam import authenticate as pam_authenticate
 
except ImportError:
 
    # work around pam.authenticate missing in python-pam 1.8.*
 
    from pam import pam
 
    pam_authenticate = pam().authenticate
 

	
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
# Cache to store PAM authenticated users
 
_auth_cache = dict()
 
_pam_lock = threading.Lock()
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
 
    # PAM authnetication can be slow. Repository operations involve a lot of
 
    # auth calls. Little caching helps speedup push/pull operations significantly
 
    AUTH_CACHE_TTL = 4
 

	
 
    def __init__(self):
 
        global _auth_cache
 
        ts = time.time()
 
        cleared_cache = dict(
 
            [(k, v) for (k, v) in _auth_cache.items() if
 
             (v + KallitheaAuthPlugin.AUTH_CACHE_TTL > ts)])
 
        _auth_cache = cleared_cache
 

	
 
    @hybrid_property
 
    def name(self):
 
        return "pam"
 

	
 
    def settings(self):
 
        settings = [
 
            {
 
                "name": "service",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "PAM service name to use for authentication",
 
                "default": "login",
 
                "formname": "PAM service name"
 
            },
 
            {
 
                "name": "gecos",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Regex for extracting user name/email etc "
 
                               "from Unix userinfo",
 
                "default": "(?P<last_name>.+),\s*(?P<first_name>\w+)",
 
                "default": r"(?P<last_name>.+),\s*(?P<first_name>\w+)",
 
                "formname": "Gecos Regex"
 
            }
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        if not username:
 
            log.debug('Empty username - skipping...')
 
            return None
 
        if username not in _auth_cache:
 
            # Need lock here, as PAM authentication is not thread safe
 
            _pam_lock.acquire()
 
            try:
 
                auth_result = pam_authenticate(username, password,
 
                                               settings["service"])
 
                # cache result only if we properly authenticated
 
                if auth_result:
 
                    _auth_cache[username] = time.time()
 
            finally:
 
                _pam_lock.release()
 

	
 
            if not auth_result:
 
                log.error("PAM was unable to authenticate user: %s", username)
 
                return None
 
        else:
 
            log.debug("Using cached auth for user: %s", username)
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        email = getattr(userobj, 'email', '') or "%s@%s" % (username, socket.gethostname())
 
        firstname = getattr(userobj, 'firstname', '')
 
        lastname = getattr(userobj, 'lastname', '')
 

	
 
        user_data = {
 
            'username': username,
 
            'firstname': firstname,
 
            'lastname': lastname,
 
            'groups': [g.gr_name for g in grp.getgrall() if username in g.gr_mem],
 
            'email': email,
 
            'admin': admin,
 
            'extern_name': username,
 
        }
 

	
 
        try:
 
            user_pw_data = pwd.getpwnam(username)
 
            regex = settings["gecos"]
 
            match = re.search(regex, user_pw_data.pw_gecos)
 
            if match:
 
                user_data["firstname"] = match.group('first_name')
 
                user_data["lastname"] = match.group('last_name')
 
        except Exception:
 
            log.warning("Cannot extract additional info for PAM user %s", username)
 
            pass
 

	
 
        log.debug("pamuser: \n%s", formatted_json(user_data))
 
        log.info('user %s authenticated correctly', user_data['username'])
 
        return user_data
 

	
 
    def get_managed_fields(self):
 
        return ['username', 'password']
kallithea/lib/diffs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.diffs
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Set of diffing helpers, previously part of vcs
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 4, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import difflib
 
import logging
 
import re
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode, SubModuleNode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _safe_id(idstring):
 
    """Make a string safe for including in an id attribute.
 
    r"""Make a string safe for including in an id attribute.
 

	
 
    The HTML spec says that id attributes 'must begin with
 
    a letter ([A-Za-z]) and may be followed by any number
 
    of letters, digits ([0-9]), hyphens ("-"), underscores
 
    ("_"), colons (":"), and periods (".")'. These regexps
 
    are slightly over-zealous, in that they remove colons
 
    and periods unnecessarily.
 

	
 
    Whitespace is transformed into underscores, and then
 
    anything which is not a hyphen or a character that
 
    matches \w (alphanumerics and underscore) is removed.
 

	
 
    """
 
    # Transform all whitespace to underscore
 
    idstring = re.sub(r'\s', "_", idstring)
 
    # Remove everything that is not a hyphen or a member of \w
 
    idstring = re.sub(r'(?!-)\W', "", idstring).lower()
 
    return idstring
 

	
 

	
 
def as_html(table_class='code-difftable', line_class='line',
 
            old_lineno_class='lineno old', new_lineno_class='lineno new',
 
            no_lineno_class='lineno',
 
            code_class='code', enable_comments=False, parsed_lines=None):
 
    """
 
    Return given diff as html table with customized css classes
 
    """
 
    def _link_to_if(condition, label, url):
 
        """
 
        Generates a link if condition is meet or just the label if not.
 
        """
 

	
 
        if condition:
 
            return '''<a href="%(url)s" data-pseudo-content="%(label)s"></a>''' % {
 
                'url': url,
 
                'label': label
 
            }
 
        else:
 
            return label
 

	
 
    _html_empty = True
 
    _html = []
 
    _html.append('''<table class="%(table_class)s">\n''' % {
 
        'table_class': table_class
 
    })
 

	
 
    for diff in parsed_lines:
 
        for line in diff['chunks']:
 
            _html_empty = False
 
            for change in line:
 
                _html.append('''<tr class="%(lc)s %(action)s">\n''' % {
 
                    'lc': line_class,
 
                    'action': change['action']
 
                })
 
                anchor_old_id = ''
 
                anchor_new_id = ''
 
                anchor_old = "%(filename)s_o%(oldline_no)s" % {
 
                    'filename': _safe_id(diff['filename']),
 
                    'oldline_no': change['old_lineno']
 
                }
 
                anchor_new = "%(filename)s_n%(oldline_no)s" % {
 
                    'filename': _safe_id(diff['filename']),
 
                    'oldline_no': change['new_lineno']
 
                }
 
                cond_old = (change['old_lineno'] != '...' and
 
                            change['old_lineno'])
 
                cond_new = (change['new_lineno'] != '...' and
 
                            change['new_lineno'])
 
                no_lineno = (change['old_lineno'] == '...' and
 
                             change['new_lineno'] == '...')
 
                if cond_old:
 
                    anchor_old_id = 'id="%s"' % anchor_old
 
                if cond_new:
 
                    anchor_new_id = 'id="%s"' % anchor_new
 
                ###########################################################
 
                # OLD LINE NUMBER
 
                ###########################################################
 
                _html.append('''\t<td %(a_id)s class="%(olc)s" %(colspan)s>''' % {
 
                    'a_id': anchor_old_id,
 
                    'olc': no_lineno_class if no_lineno else old_lineno_class,
 
                    'colspan': 'colspan="2"' if no_lineno else ''
 
                })
 

	
 
                _html.append('''%(link)s''' % {
 
                    'link': _link_to_if(not no_lineno, change['old_lineno'],
 
                                        '#%s' % anchor_old)
 
                })
 
                _html.append('''</td>\n''')
 
                ###########################################################
 
                # NEW LINE NUMBER
 
                ###########################################################
 

	
 
                if not no_lineno:
 
                    _html.append('''\t<td %(a_id)s class="%(nlc)s">''' % {
 
                        'a_id': anchor_new_id,
 
                        'nlc': new_lineno_class
 
                    })
 

	
 
                    _html.append('''%(link)s''' % {
 
                        'link': _link_to_if(True, change['new_lineno'],
 
                                            '#%s' % anchor_new)
 
                    })
 
                    _html.append('''</td>\n''')
 
                ###########################################################
 
                # CODE
 
                ###########################################################
 
                comments = '' if enable_comments else 'no-comment'
 
                _html.append('''\t<td class="%(cc)s %(inc)s">''' % {
 
                    'cc': code_class,
 
                    'inc': comments
 
                })
 
                _html.append('''\n\t\t<div class="add-bubble"><div>&nbsp;</div></div><pre>%(code)s</pre>\n''' % {
 
                    'code': change['line']
 
                })
 

	
 
                _html.append('''\t</td>''')
 
                _html.append('''\n</tr>\n''')
 
    _html.append('''</table>''')
 
    if _html_empty:
 
        return None
 
    return ''.join(_html)
 

	
 

	
 
def wrap_to_table(html):
 
    """Given a string with html, return it wrapped in a table, similar to what
 
    DiffProcessor returns."""
 
    return '''\
 
              <table class="code-difftable">
 
                <tr class="line no-comment">
 
                <td class="lineno new"></td>
 
                <td class="code no-comment"><pre>%s</pre></td>
 
                </tr>
 
              </table>''' % html
 

	
 

	
 
def wrapped_diff(filenode_old, filenode_new, diff_limit=None,
 
                ignore_whitespace=True, line_context=3,
 
                enable_comments=False):
 
    """
 
    Returns a file diff wrapped into a table.
 
    Checks for diff_limit and presents a message if the diff is too big.
 
    """
 
    if filenode_old is None:
 
        filenode_old = FileNode(filenode_new.path, '', EmptyChangeset())
 

	
 
    op = None
 
    a_path = filenode_old.path # default, might be overriden by actual rename in diff
 
    if filenode_old.is_binary or filenode_new.is_binary:
 
        html_diff = wrap_to_table(_('Binary file'))
 
        stats = (0, 0)
 

	
 
    elif diff_limit != -1 and (
 
            diff_limit is None or
 
            (filenode_old.size < diff_limit and filenode_new.size < diff_limit)):
 

	
 
        raw_diff = get_gitdiff(filenode_old, filenode_new,
 
                                ignore_whitespace=ignore_whitespace,
 
                                context=line_context)
 
        diff_processor = DiffProcessor(raw_diff)
 
        if diff_processor.parsed: # there should be exactly one element, for the specified file
 
            f = diff_processor.parsed[0]
 
            op = f['operation']
 
            a_path = f['old_filename']
 

	
 
        html_diff = as_html(parsed_lines=diff_processor.parsed, enable_comments=enable_comments)
 
        stats = diff_processor.stat()
 

	
 
    else:
 
        html_diff = wrap_to_table(_('Changeset was too big and was cut off, use '
 
                               'diff menu to display this diff'))
 
        stats = (0, 0)
 

	
 
    if not html_diff:
 
        submodules = filter(lambda o: isinstance(o, SubModuleNode),
 
                            [filenode_new, filenode_old])
 
        if submodules:
 
            html_diff = wrap_to_table(h.escape('Submodule %r' % submodules[0]))
 
        else:
 
            html_diff = wrap_to_table(_('No changes detected'))
 

	
 
    cs1 = filenode_old.changeset.raw_id
 
    cs2 = filenode_new.changeset.raw_id
 

	
 
    return cs1, cs2, a_path, html_diff, stats, op
 

	
 

	
 
def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True, context=3):
 
    """
 
    Returns git style diff between given ``filenode_old`` and ``filenode_new``.
 
    """
 
    # make sure we pass in default context
 
    context = context or 3
 
    submodules = filter(lambda o: isinstance(o, SubModuleNode),
 
                        [filenode_new, filenode_old])
 
    if submodules:
 
        return ''
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    repo = filenode_new.changeset.repository
 
    old_raw_id = getattr(filenode_old.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 
    new_raw_id = getattr(filenode_new.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 

	
 
    vcs_gitdiff = get_diff(repo, old_raw_id, new_raw_id, filenode_new.path,
 
                           ignore_whitespace, context)
 
    return vcs_gitdiff
 

	
 

	
 
def get_diff(scm_instance, rev1, rev2, path=None, ignore_whitespace=False, context=3):
 
    """
 
    A thin wrapper around vcs lib get_diff.
 
    """
 
    try:
 
        return scm_instance.get_diff(rev1, rev2, path=path,
 
                                     ignore_whitespace=ignore_whitespace, context=context)
 
    except MemoryError:
 
        h.flash('MemoryError: Diff is too big', category='error')
 
        return ''
 

	
 

	
 
NEW_FILENODE = 1
 
DEL_FILENODE = 2
 
MOD_FILENODE = 3
 
RENAMED_FILENODE = 4
 
COPIED_FILENODE = 5
 
CHMOD_FILENODE = 6
 
BIN_FILENODE = 7
 

	
 

	
 
class DiffProcessor(object):
 
    """
 
    Give it a unified or git diff and it returns a list of the files that were
 
    mentioned in the diff together with a dict of meta information that
 
    can be used to render it in a HTML template.
 
    """
 
    _diff_git_re = re.compile('^diff --git', re.MULTILINE)
 

	
 
    def __init__(self, diff, vcs='hg', diff_limit=None, inline_diff=True):
 
        """
 
        :param diff:   a text in diff format
 
        :param vcs: type of version control hg or git
 
        :param diff_limit: define the size of diff that is considered "big"
 
            based on that parameter cut off will be triggered, set to None
 
            to show full diff
 
        """
 
        if not isinstance(diff, basestring):
 
            raise Exception('Diff must be a basestring got %s instead' % type(diff))
 

	
 
        self._diff = diff
 
        self.adds = 0
 
        self.removes = 0
 
        self.diff_limit = diff_limit
 
        self.limited_diff = False
 
        self.vcs = vcs
 
        self.parsed = self._parse_gitdiff(inline_diff=inline_diff)
 

	
 
    def _parse_gitdiff(self, inline_diff):
 
        """Parse self._diff and return a list of dicts with meta info and chunks for each file.
 
        Might set limited_diff.
 
        Optionally, do an extra pass and to extra markup of one-liner changes.
 
        """
 
        _files = [] # list of dicts with meta info and chunks
 

	
 
        starts = [m.start() for m in self._diff_git_re.finditer(self._diff)]
 
        starts.append(len(self._diff))
 

	
 
        for start, end in zip(starts, starts[1:]):
 
            if self.diff_limit and end > self.diff_limit:
 
                self.limited_diff = True
 
                continue
 

	
 
            head, diff_lines = _get_header(self.vcs, buffer(self._diff, start, end - start))
 

	
 
            op = None
 
            stats = {
 
                'added': 0,
 
                'deleted': 0,
 
                'binary': False,
 
                'ops': {},
 
            }
 

	
 
            if head['deleted_file_mode']:
 
                op = 'removed'
 
                stats['binary'] = True
 
                stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
            elif head['new_file_mode']:
 
                op = 'added'
 
                stats['binary'] = True
 
                stats['ops'][NEW_FILENODE] = 'new file %s' % head['new_file_mode']
 
            else:  # modify operation, can be cp, rename, chmod
 
                # CHMOD
 
                if head['new_mode'] and head['old_mode']:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][CHMOD_FILENODE] = ('modified file chmod %s => %s'
 
                                        % (head['old_mode'], head['new_mode']))
 
                # RENAME
 
                if (head['rename_from'] and head['rename_to']
 
                      and head['rename_from'] != head['rename_to']):
 
                    op = 'renamed'
 
                    stats['binary'] = True
 
                    stats['ops'][RENAMED_FILENODE] = ('file renamed from %s to %s'
 
                                    % (head['rename_from'], head['rename_to']))
 
                # COPY
 
                if head.get('copy_from') and head.get('copy_to'):
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][COPIED_FILENODE] = ('file copied from %s to %s'
 
                                        % (head['copy_from'], head['copy_to']))
 
                # FALL BACK: detect missed old style add or remove
 
                if op is None:
 
                    if not head['a_file'] and head['b_file']:
 
                        op = 'added'
 
                        stats['binary'] = True
 
                        stats['ops'][NEW_FILENODE] = 'new file'
 

	
 
                    elif head['a_file'] and not head['b_file']:
 
                        op = 'removed'
 
                        stats['binary'] = True
 
                        stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
                # it's not ADD not DELETE
 
                if op is None:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 

	
 
            # a real non-binary diff
 
            if head['a_file'] or head['b_file']:
 
                chunks, added, deleted = _parse_lines(diff_lines)
 
                stats['binary'] = False
 
                stats['added'] = added
 
                stats['deleted'] = deleted
 
                # explicit mark that it's a modified file
 
                if op == 'modified':
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 
            else:  # Git binary patch (or empty diff)
 
                # Git binary patch
 
                if head['bin_patch']:
 
                    stats['ops'][BIN_FILENODE] = 'binary diff not shown'
 
                chunks = []
 

	
 
            if op == 'removed' and chunks:
 
                # a way of seeing deleted content could perhaps be nice - but
 
                # not with the current UI
 
                chunks = []
 

	
 
            chunks.insert(0, [{
 
                'old_lineno': '',
 
                'new_lineno': '',
 
                'action':     'context',
 
                'line':       msg,
 
                } for _op, msg in stats['ops'].iteritems()
 
                  if _op not in [MOD_FILENODE]])
 

	
 
            _files.append({
 
                'old_filename':     head['a_path'],
 
                'filename':         head['b_path'],
 
                'old_revision':     head['a_blob_id'],
 
                'new_revision':     head['b_blob_id'],
 
                'chunks':           chunks,
 
                'operation':        op,
 
                'stats':            stats,
 
            })
 

	
 
        if not inline_diff:
 
            return _files
 

	
 
        # highlight inline changes when one del is followed by one add
 
        for diff_data in _files:
 
            for chunk in diff_data['chunks']:
 
                lineiter = iter(chunk)
 
                try:
 
                    peekline = lineiter.next()
 
                    while True:
 
                        # find a first del line
 
                        while peekline['action'] != 'del':
 
                            peekline = lineiter.next()
 
                        delline = peekline
 
                        peekline = lineiter.next()
 
                        # if not followed by add, eat all following del lines
 
                        if peekline['action'] != 'add':
 
                            while peekline['action'] == 'del':
 
                                peekline = lineiter.next()
 
                            continue
 
                        # found an add - make sure it is the only one
 
                        addline = peekline
 
                        try:
 
                            peekline = lineiter.next()
 
                        except StopIteration:
 
                            # add was last line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                            raise
 
                        if peekline['action'] != 'add':
 
                            # there was only one add line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                except StopIteration:
 
                    pass
 

	
 
        return _files
 

	
 
    def stat(self):
 
        """
 
        Returns tuple of added, and removed lines for this instance
 
        """
 
        return self.adds, self.removes
 

	
 

	
 
_escape_re = re.compile(r'(&)|(<)|(>)|(\t)|(\r)|(?<=.)( \n| $)')
 

	
 

	
 
def _escaper(string):
 
    """
 
    Do HTML escaping/markup
 
    """
 

	
 
    def substitute(m):
 
        groups = m.groups()
 
        if groups[0]:
 
            return '&amp;'
 
        if groups[1]:
 
            return '&lt;'
 
        if groups[2]:
 
            return '&gt;'
 
        if groups[3]:
 
            return '<u>\t</u>'
 
        if groups[4]:
 
            return '<u class="cr"></u>'
 
        if groups[5]:
 
            return ' <i></i>'
 
        assert False
 

	
 
    return _escape_re.sub(substitute, safe_unicode(string))
 

	
 

	
 
_git_header_re = re.compile(r"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%\n
 
       ^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_hg_header_re = re.compile(r"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%(?:\n|$))?
 
    (?:^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^copy[ ]from[ ](?P<copy_from>.+)\n
 
       ^copy[ ]to[ ](?P<copy_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
def _get_header(vcs, diff_chunk):
 
    """
 
    Parses a Git diff for a single file (header and chunks) and returns a tuple with:
 

	
 
    1. A dict with meta info:
 

	
 
        a_path, b_path, similarity_index, rename_from, rename_to,
 
        old_mode, new_mode, new_file_mode, deleted_file_mode,
 
        a_blob_id, b_blob_id, b_mode, a_file, b_file
 

	
 
    2. An iterator yielding lines with simple HTML markup.
 
    """
 
    match = None
 
    if vcs == 'git':
 
        match = _git_header_re.match(diff_chunk)
 
    elif vcs == 'hg':
 
        match = _hg_header_re.match(diff_chunk)
 
    if match is None:
 
        raise Exception('diff not recognized as valid %s diff' % vcs)
 
    meta_info = match.groupdict()
 
    rest = diff_chunk[match.end():]
 
    if rest and not rest.startswith('@') and not rest.startswith('literal ') and not rest.startswith('delta '):
 
        raise Exception('cannot parse %s diff header: %r followed by %r' % (vcs, diff_chunk[:match.end()], rest[:1000]))
 
    diff_lines = (_escaper(m.group(0)) for m in re.finditer(r'.*\n|.+$', rest)) # don't split on \r as str.splitlines do
 
    return meta_info, diff_lines
 

	
 

	
 
_chunk_re = re.compile(r'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)')
 
_newline_marker = re.compile(r'^\\ No newline at end of file')
 

	
 

	
 
def _parse_lines(diff_lines):
 
    """
 
    Given an iterator of diff body lines, parse them and return a dict per
 
    line and added/removed totals.
 
    """
 
    added = deleted = 0
 
    old_line = old_end = new_line = new_end = None
 

	
 
    try:
 
        chunks = []
 
        line = diff_lines.next()
 

	
 
        while True:
 
            lines = []
 
            chunks.append(lines)
 

	
 
            match = _chunk_re.match(line)
 

	
 
            if not match:
 
                raise Exception('error parsing diff @@ line %r' % line)
 

	
 
            gr = match.groups()
 
            (old_line, old_end,
 
             new_line, new_end) = [int(x or 1) for x in gr[:-1]]
 
            old_line -= 1
 
            new_line -= 1
 

	
 
            context = len(gr) == 5
 
            old_end += old_line
 
            new_end += new_line
 

	
 
            if context:
 
                # skip context only if it's first line
 
                if int(gr[0]) > 1:
 
                    lines.append({
 
                        'old_lineno': '...',
 
                        'new_lineno': '...',
 
                        'action':     'context',
 
                        'line':       line,
 
                    })
 

	
 
            line = diff_lines.next()
 

	
 
            while old_line < old_end or new_line < new_end:
 
                if not line:
 
                    raise Exception('error parsing diff - empty line at -%s+%s' % (old_line, new_line))
 

	
 
                affects_old = affects_new = False
 

	
 
                command = line[0]
 
                if command == '+':
 
                    affects_new = True
 
                    action = 'add'
 
                    added += 1
 
                elif command == '-':
 
                    affects_old = True
 
                    action = 'del'
 
                    deleted += 1
 
                elif command == ' ':
 
                    affects_old = affects_new = True
 
                    action = 'unmod'
 
                else:
 
                    raise Exception('error parsing diff - unknown command in line %r at -%s+%s' % (line, old_line, new_line))
 

	
 
                if not _newline_marker.match(line):
 
                    old_line += affects_old
 
                    new_line += affects_new
 
                    lines.append({
 
                        'old_lineno':   affects_old and old_line or '',
 
                        'new_lineno':   affects_new and new_line or '',
 
                        'action':       action,
 
                        'line':         line[1:],
 
                    })
 

	
 
                line = diff_lines.next()
 

	
 
                if _newline_marker.match(line):
 
                    # we need to append to lines, since this is not
 
                    # counted in the line specs of diff
 
                    lines.append({
 
                        'old_lineno':   '...',
 
                        'new_lineno':   '...',
 
                        'action':       'context',
 
                        'line':         line,
 
                    })
 
                    line = diff_lines.next()
 
            if old_line > old_end:
 
                raise Exception('error parsing diff - more than %s "-" lines at -%s+%s' % (old_end, old_line, new_line))
 
            if new_line > new_end:
 
                raise Exception('error parsing diff - more than %s "+" lines at -%s+%s' % (new_end, old_line, new_line))
 
    except StopIteration:
 
        pass
 
    if old_line != old_end or new_line != new_end:
 
        raise Exception('diff processing broken when old %s<>%s or new %s<>%s line %r' % (old_line, old_end, new_line, new_end, line))
 

	
 
    return chunks, added, deleted
 

	
 
# Used for inline highlighter word split, must match the substitutions in _escaper
 
_token_re = re.compile(r'()(&amp;|&lt;|&gt;|<u>\t</u>|<u class="cr"></u>| <i></i>|\W+?)')
 

	
 

	
 
def _highlight_inline_diff(old, new):
 
    """
 
    Highlight simple add/remove in two lines given as info dicts. They are
 
    modified in place and given markup with <del>/<ins>.
 
    """
 
    assert old['action'] == 'del'
 
    assert new['action'] == 'add'
 

	
 
    oldwords = _token_re.split(old['line'])
 
    newwords = _token_re.split(new['line'])
 
    sequence = difflib.SequenceMatcher(None, oldwords, newwords)
 

	
 
    oldfragments, newfragments = [], []
 
    for tag, i1, i2, j1, j2 in sequence.get_opcodes():
 
        oldfrag = ''.join(oldwords[i1:i2])
 
        newfrag = ''.join(newwords[j1:j2])
 
        if tag != 'equal':
 
            if oldfrag:
 
                oldfrag = '<del>%s</del>' % oldfrag
 
            if newfrag:
 
                newfrag = '<ins>%s</ins>' % newfrag
 
        oldfragments.append(oldfrag)
 
        newfragments.append(newfrag)
 

	
 
    old['line'] = "".join(oldfragments)
 
    new['line'] = "".join(newfragments)
kallithea/lib/helpers.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import hashlib
 
import json
 
import logging
 
import random
 
import re
 
import StringIO
 
import textwrap
 
import urlparse
 

	
 
from beaker.cache import cache_region
 
from pygments import highlight as code_highlight
 
from pygments.formatters.html import HtmlFormatter
 
from tg.i18n import ugettext as _
 
from webhelpers2.html import HTML, escape, literal
 
from webhelpers2.html.tags import NotGiven, Option, Options, _input, _make_safe_id_component, checkbox, end_form
 
from webhelpers2.html.tags import form as insecure_form
 
from webhelpers2.html.tags import hidden, link_to, password, radio
 
from webhelpers2.html.tags import select as webhelpers2_select
 
from webhelpers2.html.tags import submit, text, textarea
 
from webhelpers2.number import format_byte_size
 
from webhelpers2.text import chop_at, truncate, wrap_paragraphs
 
from webhelpers.pylonslib import Flash as _Flash
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib.annotate import annotate_highlight
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import MENTIONS_REGEX, AttributeDict
 
from kallithea.lib.utils2 import age as _age
 
from kallithea.lib.utils2 import credentials_filter, safe_int, safe_str, safe_unicode, str2bool, time_to_datetime
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_email, author_name
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def canonical_url(*args, **kargs):
 
    '''Like url(x, qualified=True), but returns url that not only is qualified
 
    but also canonical, as configured in canonical_url'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        kargs['host'] = parts[1]
 
        kargs['protocol'] = parts[0]
 
    except IndexError:
 
        kargs['qualified'] = True
 
    return url(*args, **kargs)
 

	
 

	
 
def canonical_hostname():
 
    '''Return canonical hostname of system'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        return parts[1].split('/', 1)[0]
 
    except IndexError:
 
        parts = url('home', qualified=True).split('://', 1)
 
        return parts[1].split('/', 1)[0]
 

	
 

	
 
def html_escape(s):
 
    """Return string with all html escaped.
 
    This is also safe for javascript in html but not necessarily correct.
 
    """
 
    return (s
 
        .replace('&', '&amp;')
 
        .replace(">", "&gt;")
 
        .replace("<", "&lt;")
 
        .replace('"', "&quot;")
 
        .replace("'", "&apos;") # Note: this is HTML5 not HTML4 and might not work in mails
 
        )
 

	
 
def js(value):
 
    """Convert Python value to the corresponding JavaScript representation.
 

	
 
    This is necessary to safely insert arbitrary values into HTML <script>
 
    sections e.g. using Mako template expression substitution.
 

	
 
    Note: Rather than using this function, it's preferable to avoid the
 
    insertion of values into HTML <script> sections altogether. Instead,
 
    data should (to the extent possible) be passed to JavaScript using
 
    data attributes or AJAX calls, eliminating the need for JS specific
 
    escaping.
 

	
 
    Note: This is not safe for use in attributes (e.g. onclick), because
 
    quotes are not escaped.
 

	
 
    Because the rules for parsing <script> varies between XHTML (where
 
    normal rules apply for any special characters) and HTML (where
 
    entities are not interpreted, but the literal string "</script>"
 
    is forbidden), the function ensures that the result never contains
 
    '&', '<' and '>', thus making it safe in both those contexts (but
 
    not in attributes).
 
    """
 
    return literal(
 
        ('(' + json.dumps(value) + ')')
 
        # In JSON, the following can only appear in string literals.
 
        .replace('&', r'\x26')
 
        .replace('<', r'\x3c')
 
        .replace('>', r'\x3e')
 
    )
 

	
 

	
 
def jshtml(val):
 
    """HTML escapes a string value, then converts the resulting string
 
    to its corresponding JavaScript representation (see `js`).
 

	
 
    This is used when a plain-text string (possibly containing special
 
    HTML characters) will be used by a script in an HTML context (e.g.
 
    element.innerHTML or jQuery's 'html' method).
 

	
 
    If in doubt, err on the side of using `jshtml` over `js`, since it's
 
    better to escape too much than too little.
 
    """
 
    return js(escape(val))
 

	
 

	
 
def shorter(s, size=20, firstline=False, postfix='...'):
 
    """Truncate s to size, including the postfix string if truncating.
 
    If firstline, truncate at newline.
 
    """
 
    if firstline:
 
        s = s.split('\n', 1)[0].rstrip()
 
    if len(s) > size:
 
        return s[:size - len(postfix)] + postfix
 
    return s
 

	
 

	
 
def reset(name, value, id=NotGiven, **attrs):
 
    """Create a reset button, similar to webhelpers2.html.tags.submit ."""
 
    return _input("reset", name, value, id, attrs)
 

	
 

	
 
def select(name, selected_values, options, id=NotGiven, **attrs):
 
    """Convenient wrapper of webhelpers2 to let it accept options as a tuple list"""
 
    if isinstance(options, list):
 
        l = []
 
        for x in options:
 
            try:
 
                value, label = x
 
            except ValueError: # too many values to unpack
 
                if isinstance(x, basestring):
 
                    value = label = x
 
                else:
 
                    log.error('invalid select option %r', x)
 
                    raise
 
            l.append(Option(label, value))
 
        options = Options(l)
 
    return webhelpers2_select(name, selected_values, options, id=id, **attrs)
 

	
 

	
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a unique ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), hashlib.md5(safe_str(path)).hexdigest()[:12])
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        if isinstance(paths, str):
 
            paths = safe_unicode(paths)
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         ),
 
                                     class_='ypjax-link'
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<span id="L%s">%s</span>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        dummyoutfile = StringIO.StringIO()
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            dummyoutfile.write(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        yield 0, dummyoutfile.getvalue()
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
_whitespace_re = re.compile(r'(\t)|( )(?=\n|</div>)')
 

	
 

	
 
def _markup_whitespace(m):
 
    groups = m.groups()
 
    if groups[0]:
 
        return '<u>\t</u>'
 
    if groups[1]:
 
        return ' <i></i>'
 

	
 

	
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(filenode.content, lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 

	
 
    color_dict = {}
 

	
 
    def gen_color(n=10000):
 
        """generator for getting n of evenly distributed colors using
 
        hsv color and golden ratio. It always return same order of colors
 

	
 
        :returns: RGB tuple
 
        """
 

	
 
        def hsv_to_rgb(h, s, v):
 
            if s == 0.0:
 
                return v, v, v
 
            i = int(h * 6.0)  # XXX assume int() truncates!
 
            f = (h * 6.0) - i
 
            p = v * (1.0 - s)
 
            q = v * (1.0 - s * f)
 
            t = v * (1.0 - s * (1.0 - f))
 
            i = i % 6
 
            if i == 0:
 
                return v, t, p
 
            if i == 1:
 
                return q, v, p
 
            if i == 2:
 
                return p, v, t
 
            if i == 3:
 
                return p, q, v
 
            if i == 4:
 
                return t, p, v
 
            if i == 5:
 
                return v, p, q
 

	
 
        golden_ratio = 0.618033988749895
 
        h = 0.22717784590367374
 

	
 
        for _unused in xrange(n):
 
            h += golden_ratio
 
            h %= 1
 
            HSV_tuple = [h, 0.95, 0.95]
 
            RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
            yield map(lambda x: str(int(x * 256)), RGB_tuple)
 

	
 
    cgenerator = gen_color()
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = cgenerator.next()
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(repo_name):
 

	
 
        def _url_func(changeset):
 
            author = escape(changeset.author)
 
            date = changeset.date
 
            message = escape(changeset.message)
 
            tooltip_html = ("<b>Author:</b> %s<br/>"
 
                            "<b>Date:</b> %s</b><br/>"
 
                            "<b>Message:</b> %s") % (author, date, message)
 

	
 
            lnk_format = show_id(changeset)
 
            uri = link_to(
 
                    lnk_format,
 
                    url('changeset_home', repo_name=repo_name,
 
                        revision=changeset.raw_id),
 
                    style=get_color_string(changeset.raw_id),
 
                    **{'data-toggle': 'popover',
 
                       'data-content': tooltip_html}
 
                  )
 

	
 
            uri += '\n'
 
            return uri
 
        return _url_func
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func(repo_name), **kwargs)))
 

	
 

	
 
class _Message(object):
 
    """A message returned by ``Flash.pop_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``message``: the message text.
 
    * ``category``: the category specified when the message was created.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 
    def __str__(self):
 
        return self.message
 

	
 
    __unicode__ = __str__
 

	
 
    def __html__(self):
 
        return escape(safe_unicode(self.message))
 

	
 

	
 
class Flash(_Flash):
 

	
 
    def __call__(self, message, category=None, ignore_duplicate=False, logf=None):
 
        """
 
        Show a message to the user _and_ log it through the specified function
 

	
 
        category: notice (default), warning, error, success
 
        logf: a custom log function - such as log.debug
 

	
 
        logf defaults to log.info, unless category equals 'success', in which
 
        case logf defaults to log.debug.
 
        """
 
        if logf is None:
 
            logf = log.info
 
            if category == 'success':
 
                logf = log.debug
 

	
 
        logf('Flash %s: %s', category, message)
 

	
 
        super(Flash, self).__call__(message, category, ignore_duplicate)
 

	
 
    def pop_messages(self):
 
        """Return all accumulated messages and delete them from the session.
 

	
 
        The return value is a list of ``Message`` objects.
 
        """
 
        from tg import session
 
        messages = session.pop(self.session_key, [])
 
        session.save()
 
        return [_Message(*m) for m in messages]
 

	
 

	
 
flash = Flash()
 

	
 

	
 
age = lambda x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S").decode('utf-8')
 

	
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
@cache_region('long_term', 'user_or_none')
 
def user_or_none(author):
 
    """Try to match email part of VCS committer string with a local user - or return None"""
 
    from kallithea.model.db import User
 
    email = author_email(author)
 
    if email:
 
        return User.get_by_email(email, cache=True) # cache will only use sql_cache_short
 
    return None
 

	
 

	
 
def email_or_none(author):
 
    """Try to match email part of VCS committer string with a local user.
 
    Return primary email of user, email part of the specified author name, or None."""
 
    if not author:
 
        return None
 
    user = user_or_none(author)
 
    if user is not None:
 
        return user.email # always use main email address - not necessarily the one used to find user
 

	
 
    # extract email from the commit string
 
    email = author_email(author)
 
    if email:
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    from kallithea.model.db import User
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return person_getter(author)
 

	
 
    user = user_or_none(author)
 
    if user is not None:
 
        return person_getter(user)
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    from kallithea.model.db import User
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            lazy_cs = False
 
            title_ = None
 
            url_ = '#'
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                if rev.op and rev.ref_name:
 
                    if rev.op == 'delete_branch':
 
                        lbl = _('Deleted branch: %s') % rev.ref_name
 
                    elif rev.op == 'tag':
 
                        lbl = _('Created tag: %s') % rev.ref_name
 
                    else:
 
                        lbl = 'Unknown operation %s' % rev.op
 
                else:
 
                    lazy_cs = True
 
                    lbl = rev.short_id[:8]
 
                    url_ = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
            else:
 
                # changeset cannot be found - it might have been stripped or removed
 
                lbl = rev[:12]
 
                title_ = _('Changeset %s not found') % lbl
 
            if parse_cs:
 
                return link_to(lbl, url_, title=title_, **{'data-toggle': 'tooltip'})
 
            return link_to(lbl, url_, class_='lazy-cs' if lazy_cs else '',
 
                           **{'data-raw_id': rev.raw_id, 'data-repo_name': repo_name})
 

	
 
        def _get_op(rev_txt):
 
            _op = None
 
            _name = rev_txt
 
            if len(rev_txt.split('=>')) == 2:
 
                _op, _name = rev_txt.split('=>')
 
            return _op, _name
 

	
 
        revs = []
 
        if len(filter(lambda v: v != '', revs_ids)) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
 
                _op, _name = _get_op(rev)
 

	
 
                # we want parsed changesets, or new log store format is bad
 
                if parse_cs:
 
                    try:
 
                        if repo is None:
 
                            repo = user_log.repository.scm_instance
 
                        _rev = repo.get_changeset(rev)
 
                        revs.append(_rev)
 
                    except ChangesetDoesNotExistError:
 
                        log.error('cannot find revision %s in this repo', rev)
 
                        revs.append(rev)
 
                else:
 
                    _rev = AttributeDict({
 
                        'short_id': rev[:12],
 
                        'raw_id': rev,
 
                        'message': '',
 
                        'op': _op,
 
                        'ref_name': _name
 
                    })
 
                    revs.append(_rev)
 
        cs_links = [" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
        )]
 
        _op1, _name1 = _get_op(revs_ids[0])
 
        _op2, _name2 = _get_op(revs_ids[-1])
 

	
 
        _rev = '%s...%s' % (_name1, _name2)
 

	
 
        compare_view = (
 
            ' <div class="compare_view" data-toggle="tooltip" title="%s">'
 
            '<a href="%s">%s</a> </div>' % (
 
                _('Show all combined changesets %s->%s') % (
 
                    revs_ids[0][:12], revs_ids[-1][:12]
 
                ),
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=_rev
 
                ),
 
                _('Compare view')
 
            )
 
        )
 

	
 
        # if we have exactly one more than normally displayed
 
        # just display it, takes less space than displaying
 
        # "and 1 more revisions"
 
        if len(revs_ids) == revs_limit + 1:
 
            cs_links.append(", " + lnk(revs[revs_limit], repo_name))
 

	
 
        # hidden-by-default ones
 
        if len(revs_ids) > revs_limit + 1:
 
            uniq_id = revs_ids[0]
 
            html_tmpl = (
 
                '<span> %s <a class="show_more" id="_%s" '
 
                'href="#more">%s</a> %s</span>'
 
            )
 
            if not feed:
 
                cs_links.append(html_tmpl % (
 
                      _('and'),
 
                      uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
 
                      _('revisions')
 
                    )
 
                )
 

	
 
            if not feed:
 
                html_tmpl = '<span id="%s" style="display:none">, %s </span>'
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        url_ = url('summary_home', repo_name=repo_name)
 
        return _('Fork name %s') % link_to(action_params, url_)
 

	
 
    def get_user_name():
 
        user_name = action_params
 
        return user_name
 

	
 
    def get_users_group():
 
        group_name = action_params
 
        return group_name
 

	
 
    def get_pull_request():
 
        from kallithea.model.db import PullRequest
 
        pull_request_id = action_params
 
        nice_id = PullRequest.make_nice_id(pull_request_id)
 

	
 
        deleted = user_log.repository is None
 
        if deleted:
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 

	
 
        return link_to(_('Pull request %s') % nice_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    def get_archive_name():
 
        archive_name = action_params
 
        return archive_name
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
    'user_deleted_repo':           (_('[deleted] repository'),
 
                                    None, 'icon-trashcan'),
 
    'user_created_repo':           (_('[created] repository'),
 
                                    None, 'icon-plus'),
 
    'user_created_fork':           (_('[created] repository as fork'),
 
                                    None, 'icon-fork'),
 
    'user_forked_repo':            (_('[forked] repository'),
 
                                    get_fork_name, 'icon-fork'),
 
    'user_updated_repo':           (_('[updated] repository'),
 
                                    None, 'icon-pencil'),
 
    'user_downloaded_archive':      (_('[downloaded] archive from repository'),
 
                                    get_archive_name, 'icon-download-cloud'),
 
    'admin_deleted_repo':          (_('[delete] repository'),
 
                                    None, 'icon-trashcan'),
 
    'admin_created_repo':          (_('[created] repository'),
 
                                    None, 'icon-plus'),
 
    'admin_forked_repo':           (_('[forked] repository'),
 
                                    None, 'icon-fork'),
 
    'admin_updated_repo':          (_('[updated] repository'),
 
                                    None, 'icon-pencil'),
 
    'admin_created_user':          (_('[created] user'),
 
                                    get_user_name, 'icon-user'),
 
    'admin_updated_user':          (_('[updated] user'),
 
                                    get_user_name, 'icon-user'),
 
    'admin_created_users_group':   (_('[created] user group'),
 
                                    get_users_group, 'icon-pencil'),
 
    'admin_updated_users_group':   (_('[updated] user group'),
 
                                    get_users_group, 'icon-pencil'),
 
    'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                    get_cs_links, 'icon-comment'),
 
    'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                    get_pull_request, 'icon-comment'),
 
    'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                    get_pull_request, 'icon-ok'),
 
    'push':                        (_('[pushed] into'),
 
                                    get_cs_links, 'icon-move-up'),
 
    'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                    get_cs_links, 'icon-pencil'),
 
    'push_remote':                 (_('[pulled from remote] into repository'),
 
                                    get_cs_links, 'icon-move-up'),
 
    'pull':                        (_('[pulled] from'),
 
                                    None, 'icon-move-down'),
 
    'started_following_repo':      (_('[started following] repository'),
 
                                    None, 'icon-heart'),
 
    'stopped_following_repo':      (_('[stopped following] repository'),
 
                                    None, 'icon-heart-empty'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0] \
 
            .replace('[', '<b>') \
 
            .replace(']', '</b>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        html = """<i class="%s"></i>""" % ico
 
        return literal(html)
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 

	
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a span around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
    class is 'gravatar'.
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 
    if 'div_class' not in div_attributes:
 
        div_attributes['div_class'] = "gravatar"
 
    attributes = []
 
    for k, v in sorted(div_attributes.items()):
 
        assert k.startswith('div_'), k
 
        attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
 
    return literal("""<span%s>%s</span>""" %
 
                   (''.join(attributes),
 
                    gravatar(email_address, cls=cls, size=size)))
 

	
 

	
 
def gravatar(email_address, cls='', size=30):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 

	
 
    src = gravatar_url(email_address, size * 2)
 

	
 
    if src:
 
        # here it makes sense to use style="width: ..." (instead of, say, a
 
        # stylesheet) because we using this to generate a high-res (retina) size
 
        html = ('<i class="icon-gravatar {cls}"'
 
                ' style="font-size: {size}px;background-size: {size}px;background-image: url(\'{src}\')"'
 
                '></i>').format(cls=cls, size=size, src=src)
 

	
 
    else:
 
        # if src is empty then there was no gravatar, so we use a font icon
 
        html = ("""<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 
            .format(cls=cls, size=size, src=src))
 

	
 
    return literal(html)
 

	
 

	
 
def gravatar_url(email_address, size=30, default=''):
 
    # doh, we need to re-import those to mock it later
 
    from kallithea.config.routing import url
 
    from kallithea.model.db import User
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ""
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    email_address = email_address or _def
 

	
 
    if email_address == _def:
 
        return default
 

	
 
    parsed_url = urlparse.urlparse(url.current(qualified=True))
 
    url = (c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL ) \
 
               .replace('{email}', email_address) \
 
               .replace('{md5email}', hashlib.md5(safe_str(email_address).lower()).hexdigest()) \
 
               .replace('{netloc}', parsed_url.netloc) \
 
               .replace('{scheme}', parsed_url.scheme) \
 
               .replace('{size}', safe_str(size))
 
    return url
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([safe_unicode(x.path)
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No files')
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
 
    lines of code on file
 

	
 
    :param stats: two element list of added/deleted lines of code
 
    """
 
    from kallithea.lib.diffs import NEW_FILENODE, DEL_FILENODE, \
 
        MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE
 

	
 
    a, d = stats['added'], stats['deleted']
 
    width = 100
 

	
 
    if stats['binary']:
 
        # binary mode
 
        lbl = ''
 
        bin_op = 1
 

	
 
        if BIN_FILENODE in stats['ops']:
 
            lbl = 'bin+'
 

	
 
        if NEW_FILENODE in stats['ops']:
 
            lbl += _('new file')
 
            bin_op = NEW_FILENODE
 
        elif MOD_FILENODE in stats['ops']:
 
            lbl += _('mod')
 
            bin_op = MOD_FILENODE
 
        elif DEL_FILENODE in stats['ops']:
 
            lbl += _('del')
 
            bin_op = DEL_FILENODE
 
        elif RENAMED_FILENODE in stats['ops']:
 
            lbl += _('rename')
 
            bin_op = RENAMED_FILENODE
 

	
 
        # chmod can go with other operations
 
        if CHMOD_FILENODE in stats['ops']:
 
            _org_lbl = _('chmod')
 
            lbl += _org_lbl if lbl.endswith('+') else '+%s' % _org_lbl
 

	
 
        #import ipdb;ipdb.set_trace()
 
        b_d = '<div class="bin bin%s progress-bar" style="width:100%%">%s</div>' % (bin_op, lbl)
 
        b_a = '<div class="bin bin1" style="width:0%"></div>'
 
        return literal('<div style="width:%spx" class="progress">%s%s</div>' % (width, b_a, b_d))
 

	
 
    t = stats['added'] + stats['deleted']
 
    unit = float(width) / (t or 1)
 

	
 
    # needs > 9% of width to be visible or 0 to be hidden
 
    a_p = max(9, unit * a) if a > 0 else 0
 
    d_p = max(9, unit * d) if d > 0 else 0
 
    p_sum = a_p + d_p
 

	
 
    if p_sum > width:
 
        # adjust the percentage to be == 100% since we adjusted to 9
 
        if a_p > d_p:
 
            a_p = a_p - (p_sum - width)
 
        else:
 
            d_p = d_p - (p_sum - width)
 

	
 
    a_v = a if a > 0 else ''
 
    d_v = d if d > 0 else ''
 

	
 
    d_a = '<div class="added progress-bar" style="width:%s%%">%s</div>' % (
 
        a_p, a_v
 
    )
 
    d_d = '<div class="deleted progress-bar" style="width:%s%%">%s</div>' % (
 
        d_p, d_v
 
    )
 
    return literal('<div class="progress" style="width:%spx">%s%s</div>' % (width, d_a, d_d))
 

	
 

	
 
_URLIFY_RE = re.compile(r'''
 
# URL markup
 
(?P<url>%s) |
 
# @mention markup
 
(?P<mention>%s) |
 
# Changeset hash markup
 
(?<!\w|[-_])
 
  (?P<hash>[0-9a-f]{12,40})
 
(?!\w|[-_]) |
 
# Markup of *bold text*
 
(?:
 
  (?:^|(?<=\s))
 
  (?P<bold> [*] (?!\s) [^*\n]* (?<!\s) [*] )
 
  (?![*\w])
 
) |
 
# "Stylize" markup
 
\[see\ \=&gt;\ *(?P<seen>[a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\] |
 
\[license\ \=&gt;\ *(?P<license>[a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\] |
 
\[(?P<tagtype>requires|recommends|conflicts|base)\ \=&gt;\ *(?P<tagvalue>[a-zA-Z0-9\-\/]*)\] |
 
\[(?:lang|language)\ \=&gt;\ *(?P<lang>[a-zA-Z\-\/\#\+]*)\] |
 
\[(?P<tag>[a-z]+)\]
 
''' % (url_re.pattern, MENTIONS_REGEX.pattern),
 
    re.VERBOSE | re.MULTILINE | re.IGNORECASE)
 

	
 

	
 
def urlify_text(s, repo_name=None, link_=None, truncate=None, stylize=False, truncatef=truncate):
 
    """
 
    Parses given text message and make literal html with markup.
 
    The text will be truncated to the specified length.
 
    Hashes are turned into changeset links to specified repository.
 
    URLs links to what they say.
 
    Issues are linked to given issue-server.
 
    If link_ is provided, all text not already linking somewhere will link there.
 
    """
 

	
 
    def _replace(match_obj):
 
        url = match_obj.group('url')
 
        if url is not None:
 
            return '<a href="%(url)s">%(url)s</a>' % {'url': url}
 
        mention = match_obj.group('mention')
 
        if mention is not None:
 
            return '<b>%s</b>' % mention
 
        hash_ = match_obj.group('hash')
 
        if hash_ is not None and repo_name is not None:
 
            from kallithea.config.routing import url  # doh, we need to re-import url to mock it later
 
            return '<a class="changeset_hash" href="%(url)s">%(hash)s</a>' % {
 
                 'url': url('changeset_home', repo_name=repo_name, revision=hash_),
 
                 'hash': hash_,
 
                }
 
        bold = match_obj.group('bold')
 
        if bold is not None:
 
            return '<b>*%s*</b>' % _urlify(bold[1:-1])
 
        if stylize:
 
            seen = match_obj.group('seen')
 
            if seen:
 
                return '<div class="label label-meta" data-tag="see">see =&gt; %s</div>' % seen
 
            license = match_obj.group('license')
 
            if license:
 
                return '<div class="label label-meta" data-tag="license"><a href="http:\/\/www.opensource.org/licenses/%s">%s</a></div>' % (license, license)
 
                return '<div class="label label-meta" data-tag="license"><a href="http://www.opensource.org/licenses/%s">%s</a></div>' % (license, license)
 
            tagtype = match_obj.group('tagtype')
 
            if tagtype:
 
                tagvalue = match_obj.group('tagvalue')
 
                return '<div class="label label-meta" data-tag="%s">%s =&gt; <a href="/%s">%s</a></div>' % (tagtype, tagtype, tagvalue, tagvalue)
 
            lang = match_obj.group('lang')
 
            if lang:
 
                return '<div class="label label-meta" data-tag="lang">%s</div>' % lang
 
            tag = match_obj.group('tag')
 
            if tag:
 
                return '<div class="label label-meta" data-tag="%s">%s</div>' % (tag, tag)
 
        return match_obj.group(0)
 

	
 
    def _urlify(s):
 
        """
 
        Extract urls from text and make html links out of them
 
        """
 
        return _URLIFY_RE.sub(_replace, s)
 

	
 
    if truncate is None:
 
        s = s.rstrip()
 
    else:
 
        s = truncatef(s, truncate, whole_word=True)
 
    s = html_escape(s)
 
    s = _urlify(s)
 
    if repo_name is not None:
 
        s = urlify_issues(s, repo_name)
 
    if link_ is not None:
 
        # make href around everything that isn't a href already
 
        s = linkify_others(s, link_)
 
    s = s.replace('\r\n', '<br/>').replace('\n', '<br/>')
 
    # Turn HTML5 into more valid HTML4 as required by some mail readers.
 
    # (This is not done in one step in html_escape, because character codes like
 
    # &#123; risk to be seen as an issue reference due to the presence of '#'.)
 
    s = s.replace("&apos;", "&#39;")
 
    return literal(s)
 

	
 

	
 
def linkify_others(t, l):
 
    """Add a default link to html with links.
 
    HTML doesn't allow nesting of links, so the outer link must be broken up
 
    in pieces and give space for other links.
 
    """
 
    urls = re.compile(r'(\<a.*?\<\/a\>)',)
 
    links = []
 
    for e in urls.split(t):
 
        if e.strip() and not urls.match(e):
 
            links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
 
        else:
 
            links.append(e)
 

	
 
    return ''.join(links)
 

	
 

	
 
# Global variable that will hold the actual urlify_issues function body.
 
# Will be set on first use when the global configuration has been read.
 
_urlify_issues_f = None
 

	
 

	
 
def urlify_issues(newtext, repo_name):
 
    """Urlify issue references according to .ini configuration"""
 
    global _urlify_issues_f
 
    if _urlify_issues_f is None:
 
        from kallithea import CONFIG
 
        from kallithea.model.db import URL_SEP
 
        assert CONFIG['sqlalchemy.url'] # make sure config has been loaded
 

	
 
        # Build chain of urlify functions, starting with not doing any transformation
 
        tmp_urlify_issues_f = lambda s: s
 

	
 
        issue_pat_re = re.compile(r'issue_pat(.*)')
 
        for k in CONFIG.keys():
 
            # Find all issue_pat* settings that also have corresponding server_link and prefix configuration
 
            m = issue_pat_re.match(k)
 
            if m is None:
 
                continue
 
            suffix = m.group(1)
 
            issue_pat = CONFIG.get(k)
 
            issue_server_link = CONFIG.get('issue_server_link%s' % suffix)
 
            issue_sub = CONFIG.get('issue_sub%s' % suffix)
 
            if not issue_pat or not issue_server_link or issue_sub is None: # issue_sub can be empty but should be present
 
                log.error('skipping incomplete issue pattern %r: %r -> %r %r', suffix, issue_pat, issue_server_link, issue_sub)
 
                continue
 

	
 
            # Wrap tmp_urlify_issues_f with substitution of this pattern, while making sure all loop variables (and compiled regexpes) are bound
 
            try:
 
                issue_re = re.compile(issue_pat)
 
            except re.error as e:
 
                log.error('skipping invalid issue pattern %r: %r -> %r %r. Error: %s', suffix, issue_pat, issue_server_link, issue_sub, str(e))
 
                continue
 

	
 
            log.debug('issue pattern %r: %r -> %r %r', suffix, issue_pat, issue_server_link, issue_sub)
 

	
 
            def issues_replace(match_obj,
 
                               issue_server_link=issue_server_link, issue_sub=issue_sub):
 
                try:
 
                    issue_url = match_obj.expand(issue_server_link)
 
                except (IndexError, re.error) as e:
 
                    log.error('invalid issue_url setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                    issue_url = issue_server_link
 
                issue_url = issue_url.replace('{repo}', repo_name)
 
                issue_url = issue_url.replace('{repo_name}', repo_name.split(URL_SEP)[-1])
 
                # if issue_sub is empty use the matched issue reference verbatim
 
                if not issue_sub:
 
                    issue_text = match_obj.group()
 
                else:
 
                    try:
 
                        issue_text = match_obj.expand(issue_sub)
 
                    except (IndexError, re.error) as e:
 
                        log.error('invalid issue_sub setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                        issue_text = match_obj.group()
 

	
 
                return (
 
                    '<a class="issue-tracker-link" href="%(url)s">'
 
                    '%(text)s'
 
                    '</a>'
 
                    ) % {
 
                     'url': issue_url,
 
                     'text': issue_text,
 
                    }
 
            tmp_urlify_issues_f = (lambda s,
 
                                          issue_re=issue_re, issues_replace=issues_replace, chain_f=tmp_urlify_issues_f:
 
                                   issue_re.sub(issues_replace, chain_f(s)))
 

	
 
        # Set tmp function globally - atomically
 
        _urlify_issues_f = tmp_urlify_issues_f
 

	
 
    return _urlify_issues_f(newtext)
 

	
 

	
 
def render_w_mentions(source, repo_name=None):
 
    """
 
    Render plain text with revision hashes and issue references urlified
 
    and with @mention highlighting.
 
    """
 
    s = safe_unicode(source)
 
    s = urlify_text(s, repo_name=repo_name)
 
    return literal('<div class="formatted-fixed">%s</div>' % s)
 

	
 

	
 
def short_ref(ref_type, ref_name):
 
    if ref_type == 'rev':
 
        return short_id(ref_name)
 
    return ref_name
 

	
 

	
 
def link_to_ref(repo_name, ref_type, ref_name, rev=None):
 
    """
 
    Return full markup for a href to changeset_home for a changeset.
 
    If ref_type is branch it will link to changelog.
 
    ref_name is shortened if ref_type is 'rev'.
 
    if rev is specified show it too, explicitly linking to that revision.
 
    """
 
    txt = short_ref(ref_type, ref_name)
 
    if ref_type == 'branch':
 
        u = url('changelog_home', repo_name=repo_name, branch=ref_name)
 
    else:
 
        u = url('changeset_home', repo_name=repo_name, revision=ref_name)
 
    l = link_to(repo_name + '#' + txt, u)
 
    if rev and ref_type != 'rev':
 
        l = literal('%s (%s)' % (l, link_to(short_id(rev), url('changeset_home', repo_name=repo_name, revision=rev))))
 
    return l
 

	
 

	
 
def changeset_status(repo, revision):
 
    from kallithea.model.changeset_status import ChangesetStatusModel
 
    return ChangesetStatusModel().get_status(repo, revision)
 

	
 

	
 
def changeset_status_lbl(changeset_status):
 
    from kallithea.model.db import ChangesetStatus
 
    return ChangesetStatus.get_status_lbl(changeset_status)
 

	
 

	
 
def get_permission_name(key):
 
    from kallithea.model.db import Permission
 
    return dict(Permission.PERMS).get(key)
 

	
 

	
 
def journal_filter_help():
 
    return _(textwrap.dedent('''
 
        Example filter terms:
 
            repository:vcs
 
            username:developer
 
            action:*push*
 
            ip:127.0.0.1
 
            date:20120101
 
            date:[20120101100000 TO 20120102]
 

	
 
        Generate wildcards using '*' character:
 
            "repository:vcs*" - search everything starting with 'vcs'
 
            "repository:*vcs*" - search for repository containing 'vcs'
 

	
 
        Optional AND / OR operators in queries
 
            "repository:vcs OR repository:test"
 
            "username:test AND repository:test*"
 
    '''))
 

	
 

	
 
def not_mapped_error(repo_name):
 
    flash(_('%s repository is not mapped to db perhaps'
 
            ' it was created or renamed from the filesystem'
 
            ' please run the application again'
 
            ' in order to rescan repositories') % repo_name, category='error')
 

	
 

	
 
def ip_range(ip_addr):
 
    from kallithea.model.db import UserIpMap
 
    s, e = UserIpMap._get_ip_range(ip_addr)
 
    return '%s - %s' % (s, e)
 

	
 

	
 
session_csrf_secret_name = "_session_csrf_secret_token"
 

	
 
def session_csrf_secret_token():
 
    """Return (and create) the current session's CSRF protection token."""
 
    from tg import session
 
    if not session_csrf_secret_name in session:
 
        session[session_csrf_secret_name] = str(random.getrandbits(128))
 
        session.save()
 
    return session[session_csrf_secret_name]
 

	
 
def form(url, method="post", **attrs):
 
    """Like webhelpers.html.tags.form , but automatically adding
 
    session_csrf_secret_token for POST. The secret is thus never leaked in GET
 
    URLs.
 
    """
 
    form = insecure_form(url, method, **attrs)
 
    if method.lower() == 'get':
 
        return form
 
    return form + HTML.div(hidden(session_csrf_secret_name, session_csrf_secret_token()), style="display: none;")
kallithea/lib/markup_renderer.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.markup_renderer
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Renderer for markup languages with ability to parse using rst or markdown
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 27, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import re
 
import traceback
 

	
 
import bleach
 
import markdown as markdown_mod
 

	
 
from kallithea.lib.utils2 import MENTIONS_REGEX, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
url_re = re.compile(r'''\bhttps?://(?:[\da-zA-Z0-9@:.-]+)'''
 
                    r'''(?:[/a-zA-Z0-9_=@#~&+%.,:;?!*()-]*[/a-zA-Z0-9_=@#~])?''')
 

	
 

	
 
class MarkupRenderer(object):
 
    RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES = ['include', 'meta', 'raw']
 

	
 
    MARKDOWN_PAT = re.compile(r'md|mkdn?|mdown|markdown', re.IGNORECASE)
 
    RST_PAT = re.compile(r're?st', re.IGNORECASE)
 
    PLAIN_PAT = re.compile(r'readme', re.IGNORECASE)
 

	
 
    @classmethod
 
    def _detect_renderer(cls, source, filename):
 
        """
 
        runs detection of what renderer should be used for generating html
 
        from a markup language
 

	
 
        filename can be also explicitly a renderer name
 
        """
 

	
 
        if cls.MARKDOWN_PAT.findall(filename):
 
            return cls.markdown
 
        elif cls.RST_PAT.findall(filename):
 
            return cls.rst
 
        elif cls.PLAIN_PAT.findall(filename):
 
            return cls.rst
 
        return cls.plain
 

	
 
    @classmethod
 
    def _flavored_markdown(cls, text):
 
        """
 
        Github style flavored markdown
 

	
 
        :param text:
 
        """
 
        from hashlib import md5
 

	
 
        # Extract pre blocks.
 
        extractions = {}
 

	
 
        def pre_extraction_callback(matchobj):
 
            digest = md5(matchobj.group(0)).hexdigest()
 
            extractions[digest] = matchobj.group(0)
 
            return "{gfm-extraction-%s}" % digest
 
        pattern = re.compile(r'<pre>.*?</pre>', re.MULTILINE | re.DOTALL)
 
        text = re.sub(pattern, pre_extraction_callback, text)
 

	
 
        # Prevent foo_bar_baz from ending up with an italic word in the middle.
 
        def italic_callback(matchobj):
 
            s = matchobj.group(0)
 
            if list(s).count('_') >= 2:
 
                return s.replace('_', '\_')
 
                return s.replace('_', r'\_')
 
            return s
 
        text = re.sub(r'^(?! {4}|\t)\w+_\w+_\w[\w_]*', italic_callback, text)
 

	
 
        # In very clear cases, let newlines become <br /> tags.
 
        def newline_callback(matchobj):
 
            if len(matchobj.group(1)) == 1:
 
                return matchobj.group(0).rstrip() + '  \n'
 
            else:
 
                return matchobj.group(0)
 
        pattern = re.compile(r'^[\w\<][^\n]*(\n+)', re.MULTILINE)
 
        text = re.sub(pattern, newline_callback, text)
 

	
 
        # Insert pre block extractions.
 
        def pre_insert_callback(matchobj):
 
            return '\n\n' + extractions[matchobj.group(1)]
 
        text = re.sub(r'{gfm-extraction-([0-9a-f]{32})\}',
 
                      pre_insert_callback, text)
 

	
 
        return text
 

	
 
    @classmethod
 
    def render(cls, source, filename=None):
 
        """
 
        Renders a given filename using detected renderer
 
        it detects renderers based on file extension or mimetype.
 
        At last it will just do a simple html replacing new lines with <br/>
 

	
 
        >>> MarkupRenderer.render('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''', '.md')
 
        u'<p><img id="a" src="http://example.com/test.jpg" style="color: red;"></p>'
 
        >>> MarkupRenderer.render('''<img class="c d" src="file://localhost/test.jpg">''', 'b.mkd')
 
        u'<p><img class="c d"></p>'
 
        >>> MarkupRenderer.render('''<a href="foo">foo</a>''', 'c.mkdn')
 
        u'<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.render('''<script>alert(1)</script>''', 'd.mdown')
 
        u'&lt;script&gt;alert(1)&lt;/script&gt;'
 
        >>> MarkupRenderer.render('''<div onclick="alert(2)">yo</div>''', 'markdown')
 
        u'<div>yo</div>'
 
        >>> MarkupRenderer.render('''<a href="javascript:alert(3)">yo</a>''', 'md')
 
        u'<p><a>yo</a></p>'
 
        """
 

	
 
        renderer = cls._detect_renderer(source, filename)
 
        readme_data = renderer(source)
 
        # Allow most HTML, while preventing XSS issues:
 
        # no <script> tags, no onclick attributes, no javascript
 
        # "protocol", and also limit styling to prevent defacing.
 
        return bleach.clean(readme_data,
 
            tags=['a', 'abbr', 'b', 'blockquote', 'br', 'code', 'dd',
 
                  'div', 'dl', 'dt', 'em', 'h1', 'h2', 'h3', 'h4', 'h5',
 
                  'h6', 'hr', 'i', 'img', 'li', 'ol', 'p', 'pre', 'span',
 
                  'strong', 'sub', 'sup', 'table', 'tbody', 'td', 'th',
 
                  'thead', 'tr', 'ul'],
 
            attributes=['class', 'id', 'style', 'label', 'title', 'alt', 'href', 'src'],
 
            styles=['color'],
 
            protocols=['http', 'https', 'mailto'],
 
            )
 

	
 
    @classmethod
 
    def plain(cls, source, universal_newline=True):
 
        source = safe_unicode(source)
 
        if universal_newline:
 
            newline = '\n'
 
            source = newline.join(source.splitlines())
 

	
 
        def url_func(match_obj):
 
            url_full = match_obj.group(0)
 
            return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 
        source = url_re.sub(url_func, source)
 
        return '<br />' + source.replace("\n", '<br />')
 

	
 
    @classmethod
 
    def markdown(cls, source, safe=True, flavored=False):
 
        """
 
        Convert Markdown (possibly GitHub Flavored) to INSECURE HTML, possibly
 
        with "safe" fall-back to plaintext. Output from this method should be sanitized before use.
 

	
 
        >>> MarkupRenderer.markdown('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''')
 
        u'<p><img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<img class="c d" src="file://localhost/test.jpg">''')
 
        u'<p><img class="c d" src="file://localhost/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<a href="foo">foo</a>''')
 
        u'<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.markdown('''<script>alert(1)</script>''')
 
        u'<script>alert(1)</script>'
 
        >>> MarkupRenderer.markdown('''<div onclick="alert(2)">yo</div>''')
 
        u'<div onclick="alert(2)">yo</div>'
 
        >>> MarkupRenderer.markdown('''<a href="javascript:alert(3)">yo</a>''')
 
        u'<p><a href="javascript:alert(3)">yo</a></p>'
 
        >>> MarkupRenderer.markdown('''## Foo''')
 
        u'<h2>Foo</h2>'
 
        >>> print MarkupRenderer.markdown('''
 
        ...     #!/bin/bash
 
        ...     echo "hello"
 
        ... ''')
 
        <table class="code-highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
 
        2</pre></div></td><td class="code"><div class="code-highlight"><pre><span></span><span class="ch">#!/bin/bash</span>
 
        <span class="nb">echo</span> <span class="s2">&quot;hello&quot;</span>
 
        </pre></div>
 
        </td></tr></table>
 
        """
 
        source = safe_unicode(source)
 
        try:
 
            if flavored:
 
                source = cls._flavored_markdown(source)
 
            return markdown_mod.markdown(
 
                source,
 
                extensions=['markdown.extensions.codehilite', 'markdown.extensions.extra'],
 
                extension_configs={'markdown.extensions.codehilite': {'css_class': 'code-highlight'}})
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst(cls, source, safe=True):
 
        source = safe_unicode(source)
 
        try:
 
            from docutils.core import publish_parts
 
            from docutils.parsers.rst import directives
 
            docutils_settings = dict([(alias, None) for alias in
 
                                cls.RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES])
 

	
 
            docutils_settings.update({'input_encoding': 'unicode',
 
                                      'report_level': 4})
 

	
 
            for k, v in docutils_settings.iteritems():
 
                directives.register_directive(k, v)
 

	
 
            parts = publish_parts(source=source,
 
                                  writer_name="html4css1",
 
                                  settings_overrides=docutils_settings)
 

	
 
            return parts['html_title'] + parts["fragment"]
 
        except ImportError:
 
            log.warning('Install docutils to use this function')
 
            return cls.plain(source)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst_with_mentions(cls, source):
 

	
 
        def wrapp(match_obj):
 
            uname = match_obj.groups()[0]
 
            return '\ **@%(uname)s**\ ' % {'uname': uname}
 
            return r'\ **@%(uname)s**\ ' % {'uname': uname}
 
        mention_hl = MENTIONS_REGEX.sub(wrapp, source).strip()
 
        return cls.rst(mention_hl)
kallithea/lib/utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import traceback
 

	
 
import beaker
 
from beaker.cache import _cache_decorate
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib.exceptions import HgsubversionImportError
 
from kallithea.lib.utils2 import get_current_authuser, safe_str, safe_unicode
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.utils.fakemod import create_module
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.hgcompat import config, ui
 
from kallithea.model import meta
 
from kallithea.model.db import RepoGroup, Repository, Setting, Ui, User, UserGroup, UserLog
 
from kallithea.model.repo_group import RepoGroupModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
    _repo = request.environ['pylons.routes_dict'].get('repo_name')
 
    if _repo:
 
        _repo = _repo.rstrip('/')
 
    return _repo
 

	
 

	
 
def get_repo_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('group_name')
 
    if _group:
 
        _group = _group.rstrip('/')
 
    return _group
 

	
 

	
 
def get_user_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('id')
 
    _group = UserGroup.get(_group)
 
    if _group:
 
        return _group.users_group_name
 
    return None
 

	
 

	
 
def _get_permanent_id(s):
 
    """Helper for decoding stable URLs with repo ID. For a string like '_123'
 
    return 123.
 
    """
 
    by_id_match = re.match(r'^_(\d+)$', s)
 
    if by_id_match is None:
 
        return None
 
    return int(by_id_match.group(1))
 

	
 

	
 
def fix_repo_id_name(path):
 
    """
 
    Rewrite repo_name for _<ID> permanent URLs.
 

	
 
    Given a path, if the first path element is like _<ID>, return the path with
 
    this part expanded to the corresponding full repo name, else return the
 
    provided path.
 
    """
 
    first, rest = path, ''
 
    if '/' in path:
 
        first, rest_ = path.split('/', 1)
 
        rest = '/' + rest_
 
    repo_id = _get_permanent_id(first)
 
    if repo_id is not None:
 
        from kallithea.model.db import Repository
 
        repo = Repository.get(repo_id)
 
        if repo is not None:
 
            return repo.repo_name + rest
 
    return path
 

	
 

	
 
def action_logger(user, action, repo, ipaddr='', commit=False):
 
    """
 
    Action logger for various actions made by users
 

	
 
    :param user: user that made this action, can be a unique username string or
 
        object containing user_id attribute
 
    :param action: action to log, should be on of predefined unique actions for
 
        easy translations
 
    :param repo: string name of repository or object containing repo_id,
 
        that action was made on
 
    :param ipaddr: optional IP address from what the action was made
 

	
 
    """
 

	
 
    # if we don't get explicit IP address try to get one from registered user
 
    # in tmpl context var
 
    if not ipaddr:
 
        ipaddr = getattr(get_current_authuser(), 'ip_addr', '')
 

	
 
    if getattr(user, 'user_id', None):
 
        user_obj = User.get(user.user_id)
 
    elif isinstance(user, basestring):
 
        user_obj = User.get_by_username(user)
 
    else:
 
        raise Exception('You have to provide a user object or a username')
 

	
 
    if getattr(repo, 'repo_id', None):
 
        repo_obj = Repository.get(repo.repo_id)
 
        repo_name = repo_obj.repo_name
 
    elif isinstance(repo, basestring):
 
        repo_name = repo.lstrip('/')
 
        repo_obj = Repository.get_by_repo_name(repo_name)
 
    else:
 
        repo_obj = None
 
        repo_name = u''
 

	
 
    user_log = UserLog()
 
    user_log.user_id = user_obj.user_id
 
    user_log.username = user_obj.username
 
    user_log.action = safe_unicode(action)
 

	
 
    user_log.repository = repo_obj
 
    user_log.repository_name = repo_name
 

	
 
    user_log.action_date = datetime.datetime.now()
 
    user_log.user_ip = ipaddr
 
    meta.Session().add(user_log)
 

	
 
    log.info('Logging action:%s on %s by user:%s ip:%s',
 
             action, safe_unicode(repo), user_obj, ipaddr)
 
    if commit:
 
        meta.Session().commit()
 

	
 

	
 
def get_filesystem_repos(path):
 
    """
 
    Scans given path for repos and return (name,(type,path)) tuple
 

	
 
    :param path: path to scan for repositories
 
    :param recursive: recursive search and return names with subdirs in front
 
    """
 

	
 
    # remove ending slash for better results
 
    path = safe_str(path.rstrip(os.sep))
 
    log.debug('now scanning in %s', path)
 

	
 
    def isdir(*n):
 
        return os.path.isdir(os.path.join(*n))
 

	
 
    for root, dirs, _files in os.walk(path):
 
        recurse_dirs = []
 
        for subdir in dirs:
 
            # skip removed repos
 
            if REMOVED_REPO_PAT.match(subdir):
 
                continue
 

	
 
            # skip .<something> dirs TODO: rly? then we should prevent creating them ...
 
            if subdir.startswith('.'):
 
                continue
 

	
 
            cur_path = os.path.join(root, subdir)
 
            if isdir(cur_path, '.git'):
 
                log.warning('ignoring non-bare Git repo: %s', cur_path)
 
                continue
 

	
 
            if (isdir(cur_path, '.hg') or
 
                isdir(cur_path, '.svn') or
 
                isdir(cur_path, 'objects') and (isdir(cur_path, 'refs') or
 
                                                os.path.isfile(os.path.join(cur_path, 'packed-refs')))):
 

	
 
                if not os.access(cur_path, os.R_OK) or not os.access(cur_path, os.X_OK):
 
                    log.warning('ignoring repo path without access: %s', cur_path)
 
                    continue
 

	
 
                if not os.access(cur_path, os.W_OK):
 
                    log.warning('repo path without write access: %s', cur_path)
 

	
 
                try:
 
                    scm_info = get_scm(cur_path)
 
                    assert cur_path.startswith(path)
 
                    repo_path = cur_path[len(path) + 1:]
 
                    yield repo_path, scm_info
 
                    continue # no recursion
 
                except VCSError:
 
                    # We should perhaps ignore such broken repos, but especially
 
                    # the bare git detection is unreliable so we dive into it
 
                    pass
 

	
 
            recurse_dirs.append(subdir)
 

	
 
        dirs[:] = recurse_dirs
 

	
 

	
 
def is_valid_repo_uri(repo_type, url, ui):
 
    """Check if the url seems like a valid remote repo location - raise an Exception if any problems"""
 
    if repo_type == 'hg':
 
        from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
        if url.startswith('http') or url.startswith('ssh'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            MercurialRepository._check_url(url, ui)
 
        elif url.startswith('svn+http'):
 
            try:
 
                from hgsubversion.svnrepo import svnremoterepo
 
            except ImportError:
 
                raise HgsubversionImportError(_('Unable to activate hgsubversion support. '
 
                                                'The "hgsubversion" library is missing'))
 
            svnremoterepo(ui, url).svn.uuid
 
        elif url.startswith('git+http'):
 
            raise NotImplementedError()
 
        else:
 
            raise Exception('URI %s not allowed' % (url,))
 

	
 
    elif repo_type == 'git':
 
        from kallithea.lib.vcs.backends.git.repository import GitRepository
 
        if url.startswith('http') or url.startswith('git'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            GitRepository._check_url(url)
 
        elif url.startswith('svn+http'):
 
            raise NotImplementedError()
 
        elif url.startswith('hg+http'):
 
            raise NotImplementedError()
 
        else:
 
            raise Exception('URI %s not allowed' % (url))
 

	
 

	
 
def is_valid_repo(repo_name, base_path, scm=None):
 
    """
 
    Returns True if given path is a valid repository False otherwise.
 
    If scm param is given also compare if given scm is the same as expected
 
    from scm parameter
 

	
 
    :param repo_name:
 
    :param base_path:
 
    :param scm:
 

	
 
    :return True: if given path is a valid repository
 
    """
 
    # TODO: paranoid security checks?
 
    full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
 

	
 
    try:
 
        scm_ = get_scm(full_path)
 
        if scm:
 
            return scm_[0] == scm
 
        return True
 
    except VCSError:
 
        return False
 

	
 

	
 
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
 
    """
 
    Returns True if given path is a repository group False otherwise
 

	
 
    :param repo_name:
 
    :param base_path:
 
    """
 
    full_path = os.path.join(safe_str(base_path), safe_str(repo_group_name))
 

	
 
    # check if it's not a repo
 
    if is_valid_repo(repo_group_name, base_path):
 
        return False
 

	
 
    try:
 
        # we need to check bare git repos at higher level
 
        # since we might match branches/hooks/info/objects or possible
 
        # other things inside bare git repo
 
        get_scm(os.path.dirname(full_path))
 
        return False
 
    except VCSError:
 
        pass
 

	
 
    # check if it's a valid path
 
    if skip_path_check or os.path.isdir(full_path):
 
        return True
 

	
 
    return False
 

	
 

	
 
# propagated from mercurial documentation
 
ui_sections = ['alias', 'auth',
 
                'decode/encode', 'defaults',
 
                'diff', 'email',
 
                'extensions', 'format',
 
                'merge-patterns', 'merge-tools',
 
                'hooks', 'http_proxy',
 
                'smtp', 'patch',
 
                'paths', 'profiling',
 
                'server', 'trusted',
 
                'ui', 'web', ]
 

	
 

	
 
def make_ui(repo_path=None, clear_session=True):
 
    """
 
    Create an Mercurial 'ui' object based on database Ui settings, possibly
 
    augmenting with content from a hgrc file.
 
    """
 
    baseui = ui.ui()
 

	
 
    # clean the baseui object
 
    baseui._ocfg = config.config()
 
    baseui._ucfg = config.config()
 
    baseui._tcfg = config.config()
 

	
 
    sa = meta.Session()
 
    for ui_ in sa.query(Ui).all():
 
        if ui_.ui_active:
 
            ui_val = '' if ui_.ui_value is None else safe_str(ui_.ui_value)
 
            log.debug('config from db: [%s] %s=%r', ui_.ui_section,
 
                      ui_.ui_key, ui_val)
 
            baseui.setconfig(safe_str(ui_.ui_section), safe_str(ui_.ui_key),
 
                             ui_val)
 
    if clear_session:
 
        meta.Session.remove()
 

	
 
    # force set push_ssl requirement to False, Kallithea handles that
 
    baseui.setconfig('web', 'push_ssl', False)
 
    baseui.setconfig('web', 'allow_push', '*')
 
    # prevent interactive questions for ssh password / passphrase
 
    ssh = baseui.config('ui', 'ssh', default='ssh')
 
    baseui.setconfig('ui', 'ssh', '%s -oBatchMode=yes -oIdentitiesOnly=yes' % ssh)
 
    # push / pull hooks
 
    baseui.setconfig('hooks', 'changegroup.kallithea_log_push_action', 'python:kallithea.lib.hooks.log_push_action')
 
    baseui.setconfig('hooks', 'outgoing.kallithea_log_pull_action', 'python:kallithea.lib.hooks.log_pull_action')
 

	
 
    if repo_path is not None:
 
        hgrc_path = os.path.join(repo_path, '.hg', 'hgrc')
 
        if os.path.isfile(hgrc_path):
 
            log.debug('reading hgrc from %s', hgrc_path)
 
            cfg = config.config()
 
            cfg.read(hgrc_path)
 
            for section in ui_sections:
 
                for k, v in cfg.items(section):
 
                    log.debug('config from file: [%s] %s=%s', section, k, v)
 
                    baseui.setconfig(safe_str(section), safe_str(k), safe_str(v))
 
        else:
 
            log.debug('hgrc file is not present at %s, skipping...', hgrc_path)
 

	
 
    return baseui
 

	
 

	
 
def set_app_settings(config):
 
    """
 
    Updates app config with new settings from database
 

	
 
    :param config:
 
    """
 
    try:
 
        hgsettings = Setting.get_app_settings()
 
        for k, v in hgsettings.items():
 
            config[k] = v
 
    finally:
 
        meta.Session.remove()
 

	
 

	
 
def set_vcs_config(config):
 
    """
 
    Patch VCS config with some Kallithea specific stuff
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    from kallithea.lib.vcs import conf
 
    from kallithea.lib.utils2 import aslist
 
    conf.settings.BACKENDS = {
 
        'hg': 'kallithea.lib.vcs.backends.hg.MercurialRepository',
 
        'git': 'kallithea.lib.vcs.backends.git.GitRepository',
 
    }
 

	
 
    conf.settings.GIT_EXECUTABLE_PATH = config.get('git_path', 'git')
 
    conf.settings.GIT_REV_FILTER = config.get('git_rev_filter', '--all').strip()
 
    conf.settings.DEFAULT_ENCODINGS = aslist(config.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 

	
 

	
 
def set_indexer_config(config):
 
    """
 
    Update Whoosh index mapping
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    from kallithea.config import conf
 

	
 
    log.debug('adding extra into INDEX_EXTENSIONS')
 
    conf.INDEX_EXTENSIONS.extend(re.split('\s+', config.get('index.extensions', '')))
 
    conf.INDEX_EXTENSIONS.extend(re.split(r'\s+', config.get('index.extensions', '')))
 

	
 
    log.debug('adding extra into INDEX_FILENAMES')
 
    conf.INDEX_FILENAMES.extend(re.split('\s+', config.get('index.filenames', '')))
 
    conf.INDEX_FILENAMES.extend(re.split(r'\s+', config.get('index.filenames', '')))
 

	
 

	
 
def map_groups(path):
 
    """
 
    Given a full path to a repository, create all nested groups that this
 
    repo is inside. This function creates parent-child relationships between
 
    groups and creates default perms for all new groups.
 

	
 
    :param paths: full path to repository
 
    """
 
    sa = meta.Session()
 
    groups = path.split(Repository.url_sep())
 
    parent = None
 
    group = None
 

	
 
    # last element is repo in nested groups structure
 
    groups = groups[:-1]
 
    rgm = RepoGroupModel()
 
    owner = User.get_first_admin()
 
    for lvl, group_name in enumerate(groups):
 
        group_name = u'/'.join(groups[:lvl] + [group_name])
 
        group = RepoGroup.get_by_group_name(group_name)
 
        desc = '%s group' % group_name
 

	
 
        # skip folders that are now removed repos
 
        if REMOVED_REPO_PAT.match(group_name):
 
            break
 

	
 
        if group is None:
 
            log.debug('creating group level: %s group_name: %s',
 
                      lvl, group_name)
 
            group = RepoGroup(group_name, parent)
 
            group.group_description = desc
 
            group.owner = owner
 
            sa.add(group)
 
            rgm._create_default_perms(group)
 
            sa.flush()
 

	
 
        parent = group
 
    return group
 

	
 

	
 
def repo2db_mapper(initial_repo_list, remove_obsolete=False,
 
                   install_git_hooks=False, user=None, overwrite_git_hooks=False):
 
    """
 
    maps all repos given in initial_repo_list, non existing repositories
 
    are created, if remove_obsolete is True it also check for db entries
 
    that are not in initial_repo_list and removes them.
 

	
 
    :param initial_repo_list: list of repositories found by scanning methods
 
    :param remove_obsolete: check for obsolete entries in database
 
    :param install_git_hooks: if this is True, also check and install git hook
 
        for a repo if missing
 
    :param overwrite_git_hooks: if this is True, overwrite any existing git hooks
 
        that may be encountered (even if user-deployed)
 
    """
 
    from kallithea.model.repo import RepoModel
 
    from kallithea.model.scm import ScmModel
 
    sa = meta.Session()
 
    repo_model = RepoModel()
 
    if user is None:
 
        user = User.get_first_admin()
 
    added = []
 

	
 
    # creation defaults
 
    defs = Setting.get_default_repo_settings(strip_prefix=True)
 
    enable_statistics = defs.get('repo_enable_statistics')
 
    enable_downloads = defs.get('repo_enable_downloads')
 
    private = defs.get('repo_private')
 

	
 
    for name, repo in initial_repo_list.items():
 
        group = map_groups(name)
 
        unicode_name = safe_unicode(name)
 
        db_repo = repo_model.get_by_repo_name(unicode_name)
 
        # found repo that is on filesystem not in Kallithea database
 
        if not db_repo:
 
            log.info('repository %s not found, creating now', name)
 
            added.append(name)
 
            desc = (repo.description
 
                    if repo.description != 'unknown'
 
                    else '%s repository' % name)
 

	
 
            new_repo = repo_model._create_repo(
 
                repo_name=name,
 
                repo_type=repo.alias,
 
                description=desc,
 
                repo_group=getattr(group, 'group_id', None),
 
                owner=user,
 
                enable_downloads=enable_downloads,
 
                enable_statistics=enable_statistics,
 
                private=private,
 
                state=Repository.STATE_CREATED
 
            )
 
            sa.commit()
 
            # we added that repo just now, and make sure it has githook
 
            # installed, and updated server info
 
            if new_repo.repo_type == 'git':
 
                git_repo = new_repo.scm_instance
 
                ScmModel().install_git_hooks(git_repo)
 
                # update repository server-info
 
                log.debug('Running update server info')
 
                git_repo._update_server_info()
 
            new_repo.update_changeset_cache()
 
        elif install_git_hooks:
 
            if db_repo.repo_type == 'git':
 
                ScmModel().install_git_hooks(db_repo.scm_instance, force_create=overwrite_git_hooks)
 

	
 
    removed = []
 
    # remove from database those repositories that are not in the filesystem
 
    unicode_initial_repo_list = set(safe_unicode(name) for name in initial_repo_list)
 
    for repo in sa.query(Repository).all():
 
        if repo.repo_name not in unicode_initial_repo_list:
 
            if remove_obsolete:
 
                log.debug("Removing non-existing repository found in db `%s`",
 
                          repo.repo_name)
 
                try:
 
                    RepoModel().delete(repo, forks='detach', fs_remove=False)
 
                    sa.commit()
 
                except Exception:
 
                    #don't hold further removals on error
 
                    log.error(traceback.format_exc())
 
                    sa.rollback()
 
            removed.append(repo.repo_name)
 
    return added, removed
 

	
 

	
 
def load_rcextensions(root_path):
 
    import kallithea
 
    from kallithea.config import conf
 

	
 
    path = os.path.join(root_path, 'rcextensions', '__init__.py')
 
    if os.path.isfile(path):
 
        rcext = create_module('rc', path)
 
        EXT = kallithea.EXTENSIONS = rcext
 
        log.debug('Found rcextensions now loading %s...', rcext)
 

	
 
        # Additional mappings that are not present in the pygments lexers
 
        conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(EXT, 'EXTRA_MAPPINGS', {}))
 

	
 
        # OVERRIDE OUR EXTENSIONS FROM RC-EXTENSIONS (if present)
 

	
 
        if getattr(EXT, 'INDEX_EXTENSIONS', []):
 
            log.debug('settings custom INDEX_EXTENSIONS')
 
            conf.INDEX_EXTENSIONS = getattr(EXT, 'INDEX_EXTENSIONS', [])
 

	
 
        # ADDITIONAL MAPPINGS
 
        log.debug('adding extra into INDEX_EXTENSIONS')
 
        conf.INDEX_EXTENSIONS.extend(getattr(EXT, 'EXTRA_INDEX_EXTENSIONS', []))
 

	
 
        # auto check if the module is not missing any data, set to default if is
 
        # this will help autoupdate new feature of rcext module
 
        #from kallithea.config import rcextensions
 
        #for k in dir(rcextensions):
 
        #    if not k.startswith('_') and not hasattr(EXT, k):
 
        #        setattr(EXT, k, getattr(rcextensions, k))
 

	
 

	
 
#==============================================================================
 
# MISC
 
#==============================================================================
 

	
 
def check_git_version():
 
    """
 
    Checks what version of git is installed in system, and issues a warning
 
    if it's too old for Kallithea to work properly.
 
    """
 
    from kallithea import BACKENDS
 
    from kallithea.lib.vcs.backends.git.repository import GitRepository
 
    from kallithea.lib.vcs.conf import settings
 
    from distutils.version import StrictVersion
 

	
 
    if 'git' not in BACKENDS:
 
        return None
 

	
 
    stdout, stderr = GitRepository._run_git_command(['--version'], _bare=True,
 
                                                    _safe=True)
 

	
 
    m = re.search("\d+.\d+.\d+", stdout)
 
    m = re.search(r"\d+.\d+.\d+", stdout)
 
    if m:
 
        ver = StrictVersion(m.group(0))
 
    else:
 
        ver = StrictVersion('0.0.0')
 

	
 
    req_ver = StrictVersion('1.7.4')
 

	
 
    log.debug('Git executable: "%s" version %s detected: %s',
 
              settings.GIT_EXECUTABLE_PATH, ver, stdout)
 
    if stderr:
 
        log.warning('Error detecting git version: %r', stderr)
 
    elif ver < req_ver:
 
        log.warning('Kallithea detected git version %s, which is too old '
 
                    'for the system to function properly. '
 
                    'Please upgrade to version %s or later.' % (ver, req_ver))
 
    return ver
 

	
 

	
 
#===============================================================================
 
# CACHE RELATED METHODS
 
#===============================================================================
 

	
 
# set cache regions for beaker so celery can utilise it
 
def setup_cache_regions(settings):
 
    # Create dict with just beaker cache configs with prefix stripped
 
    cache_settings = {'regions': None}
 
    prefix = 'beaker.cache.'
 
    for key in settings:
 
        if key.startswith(prefix):
 
            name = key[len(prefix):]
 
            cache_settings[name] = settings[key]
 
    # Find all regions, apply defaults, and apply to beaker
 
    if cache_settings['regions']:
 
        for region in cache_settings['regions'].split(','):
 
            region = region.strip()
 
            prefix = region + '.'
 
            region_settings = {}
 
            for key in cache_settings:
 
                if key.startswith(prefix):
 
                    name = key[len(prefix):]
 
                    region_settings[name] = cache_settings[key]
 
            region_settings.setdefault('expire',
 
                                       cache_settings.get('expire', '60'))
 
            region_settings.setdefault('lock_dir',
 
                                       cache_settings.get('lock_dir'))
 
            region_settings.setdefault('data_dir',
 
                                       cache_settings.get('data_dir'))
 
            region_settings.setdefault('type',
 
                                       cache_settings.get('type', 'memory'))
 
            beaker.cache.cache_regions[region] = region_settings
 

	
 

	
 
def conditional_cache(region, prefix, condition, func):
 
    """
 

	
 
    Conditional caching function use like::
 
        def _c(arg):
 
            #heavy computation function
 
            return data
 

	
 
        # depending from condition the compute is wrapped in cache or not
 
        compute = conditional_cache('short_term', 'cache_desc', condition=True, func=func)
 
        return compute(arg)
 

	
 
    :param region: name of cache region
 
    :param prefix: cache region prefix
 
    :param condition: condition for cache to be triggered, and return data cached
 
    :param func: wrapped heavy function to compute
 

	
 
    """
 
    wrapped = func
 
    if condition:
 
        log.debug('conditional_cache: True, wrapping call of '
 
                  'func: %s into %s region cache' % (region, func))
 
        wrapped = _cache_decorate((prefix,), None, None, region)(func)
 

	
 
    return wrapped
kallithea/lib/utils2.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils2
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Some simple helper functions.
 
Note: all these functions should be independent of Kallithea classes, i.e.
 
models, controllers, etc.  to prevent import cycles.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import binascii
 
import datetime
 
import os
 
import pwd
 
import re
 
import time
 
import urllib
 

	
 
import urlobject
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
from kallithea.lib.compat import json
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
 
    """
 
    if isinstance(obj, (basestring)):
 
        lst = obj.split(sep)
 
        if strip:
 
            lst = [v.strip() for v in lst]
 
        return lst
 
    elif isinstance(obj, (list, tuple)):
 
        return obj
 
    elif obj is None:
 
        return []
 
    else:
 
        return [obj]
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" according to given mode
 

	
 
    Available modes are::
 
        0 - Unix
 
        1 - Mac
 
        2 - DOS
 

	
 
    :param line: given line to convert
 
    :param mode: mode to convert to
 
    :rtype: str
 
    :return: converted line according to mode
 
    """
 
    from string import replace
 

	
 
    if mode == 0:
 
            line = replace(line, '\r\n', '\n')
 
            line = replace(line, '\r', '\n')
 
    elif mode == 1:
 
            line = replace(line, '\r\n', '\r')
 
            line = replace(line, '\n', '\r')
 
    elif mode == 2:
 
            line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
    return line
 

	
 

	
 
def detect_mode(line, default):
 
    """
 
    Detects line break for given line, if line break couldn't be found
 
    given default value is returned
 

	
 
    :param line: str line
 
    :param default: default
 
    :rtype: int
 
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
 
    """
 
    if line.endswith('\r\n'):
 
        return 2
 
    elif line.endswith('\n'):
 
        return 0
 
    elif line.endswith('\r'):
 
        return 1
 
    else:
 
        return default
 

	
 

	
 
def generate_api_key():
 
    """
 
    Generates a random (presumably unique) API key.
 

	
 
    This value is used in URLs and "Bearer" HTTP Authorization headers,
 
    which in practice means it should only contain URL-safe characters
 
    (RFC 3986):
 

	
 
        unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
 
    """
 
    # Hexadecimal certainly qualifies as URL-safe.
 
    return binascii.hexlify(os.urandom(20))
 

	
 

	
 
def safe_int(val, default=None):
 
    """
 
    Returns int() of val if val is not convertable to int use default
 
    instead
 

	
 
    :param val:
 
    :param default:
 
    """
 

	
 
    try:
 
        val = int(val)
 
    except (ValueError, TypeError):
 
        val = default
 

	
 
    return val
 

	
 

	
 
def safe_unicode(str_, from_encoding=None):
 
    """
 
    safe unicode function. Does few trick to turn str_ into unicode
 

	
 
    In case of UnicodeDecode error we try to return it with encoding detected
 
    by chardet library if it fails fallback to unicode with errors replaced
 

	
 
    :param str_: string to decode
 
    :rtype: unicode
 
    :returns: unicode object
 
    """
 
    if isinstance(str_, unicode):
 
        return str_
 

	
 
    if not from_encoding:
 
        import kallithea
 
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 
        from_encoding = DEFAULT_ENCODINGS
 

	
 
    if not isinstance(from_encoding, (list, tuple)):
 
        from_encoding = [from_encoding]
 

	
 
    try:
 
        return unicode(str_)
 
    except UnicodeDecodeError:
 
        pass
 

	
 
    for enc in from_encoding:
 
        try:
 
            return unicode(str_, enc)
 
        except UnicodeDecodeError:
 
            pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(str_)['encoding']
 
        if encoding is None:
 
            raise Exception()
 
        return str_.decode(encoding)
 
    except (ImportError, UnicodeDecodeError, Exception):
 
        return unicode(str_, from_encoding[0], 'replace')
 

	
 

	
 
def safe_str(unicode_, to_encoding=None):
 
    """
 
    safe str function. Does few trick to turn unicode_ into string
 

	
 
    In case of UnicodeEncodeError we try to return it with encoding detected
 
    by chardet library if it fails fallback to string with errors replaced
 

	
 
    :param unicode_: unicode to encode
 
    :rtype: str
 
    :returns: str object
 
    """
 

	
 
    # if it's not basestr cast to str
 
    if not isinstance(unicode_, basestring):
 
        return str(unicode_)
 

	
 
    if isinstance(unicode_, str):
 
        return unicode_
 

	
 
    if not to_encoding:
 
        import kallithea
 
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 
        to_encoding = DEFAULT_ENCODINGS
 

	
 
    if not isinstance(to_encoding, (list, tuple)):
 
        to_encoding = [to_encoding]
 

	
 
    for enc in to_encoding:
 
        try:
 
            return unicode_.encode(enc)
 
        except UnicodeEncodeError:
 
            pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(unicode_)['encoding']
 
        if encoding is None:
 
            raise UnicodeEncodeError()
 

	
 
        return unicode_.encode(encoding)
 
    except (ImportError, UnicodeEncodeError):
 
        return unicode_.encode(to_encoding[0], 'replace')
 

	
 

	
 
def remove_suffix(s, suffix):
 
    if s.endswith(suffix):
 
        s = s[:-1 * len(suffix)]
 
    return s
 

	
 

	
 
def remove_prefix(s, prefix):
 
    if s.startswith(prefix):
 
        s = s[len(prefix):]
 
    return s
 

	
 

	
 
def age(prevdate, show_short_version=False, now=None):
 
    """
 
    turns a datetime into an age string.
 
    If show_short_version is True, then it will generate a not so accurate but shorter string,
 
    example: 2days ago, instead of 2 days and 23 hours ago.
 

	
 
    :param prevdate: datetime object
 
    :param show_short_version: if it should approximate the date and return a shorter string
 
    :rtype: unicode
 
    :returns: unicode words describing age
 
    """
 
    now = now or datetime.datetime.now()
 
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
 
    deltas = {}
 
    future = False
 

	
 
    if prevdate > now:
 
        now, prevdate = prevdate, now
 
        future = True
 
    if future:
 
        prevdate = prevdate.replace(microsecond=0)
 
    # Get date parts deltas
 
    from dateutil import relativedelta
 
    for part in order:
 
        d = relativedelta.relativedelta(now, prevdate)
 
        deltas[part] = getattr(d, part + 's')
 

	
 
    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
 
    # not 1 hour, -59 minutes and -59 seconds)
 
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
 
        part = order[num]
 
        carry_part = order[num - 1]
 

	
 
        if deltas[part] < 0:
 
            deltas[part] += length
 
            deltas[carry_part] -= 1
 

	
 
    # Same thing for days except that the increment depends on the (variable)
 
    # number of days in the month
 
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
 
    if deltas['day'] < 0:
 
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
 
            deltas['day'] += 29
 
        else:
 
            deltas['day'] += month_lengths[prevdate.month - 1]
 

	
 
        deltas['month'] -= 1
 

	
 
    if deltas['month'] < 0:
 
        deltas['month'] += 12
 
        deltas['year'] -= 1
 

	
 
    # In short version, we want nicer handling of ages of more than a year
 
    if show_short_version:
 
        if deltas['year'] == 1:
 
            # ages between 1 and 2 years: show as months
 
            deltas['month'] += 12
 
            deltas['year'] = 0
 
        if deltas['year'] >= 2:
 
            # ages 2+ years: round
 
            if deltas['month'] > 6:
 
                deltas['year'] += 1
 
                deltas['month'] = 0
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _('in %s') % fmt_funcs[part](value)
 
            else:
 
                return _('%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _('in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _('%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _('just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://', 'git://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return filter(None, [proto, host, port])
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    # check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
 
    parsed_url = urlobject.URLObject(prefix_url)
 
    prefix = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
 
    try:
 
        system_user = pwd.getpwuid(os.getuid()).pw_name
 
    except Exception: # TODO: support all systems - especially Windows
 
        system_user = 'kallithea' # hardcoded default value ...
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': safe_unicode(urllib.quote(safe_str(username or ''))),
 
        'netloc': parsed_url.netloc + prefix,  # like "hostname:port/prefix" (with optional ":port" and "/prefix")
 
        'prefix': prefix, # undocumented, empty or starting with /
 
        'repo': repo_name,
 
        'repoid': str(repo_id),
 
        'system_user': safe_unicode(system_user),
 
        'hostname': parsed_url.hostname,
 
    }
 
    url = re.sub('{([^{}]+)}', lambda m: args.get(m.group(1), m.group(0)), clone_uri_tmpl)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(url)
 
    if not url_obj.username:
 
        url_obj = url_obj.with_username(None)
 

	
 
    return safe_unicode(url_obj)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s', type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, basestring):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    from sqlalchemy.engine import url as sa_url
 
    from sqlalchemy.exc import ArgumentError
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
def get_hook_environment():
 
    """
 
    Get hook context by deserializing the global KALLITHEA_EXTRAS environment
 
    variable.
 

	
 
    Called early in Git out-of-process hooks to get .ini config path so the
 
    basic environment can be configured properly. Also used in all hooks to get
 
    information about the action that triggered it.
 
    """
 

	
 
    try:
 
        extras = json.loads(os.environ['KALLITHEA_EXTRAS'])
 
    except KeyError:
 
        raise Exception("Environment variable KALLITHEA_EXTRAS not found")
 

	
 
    try:
 
        for k in ['username', 'repository', 'scm', 'action', 'ip']:
 
            extras[k]
 
    except KeyError:
 
        raise Exception('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def set_hook_environment(username, ip_addr, repo_name, repo_alias, action=None):
 
    """Prepare global context for running hooks by serializing data in the
 
    global KALLITHEA_EXTRAS environment variable.
 

	
 
    Most importantly, this allow Git hooks to do proper logging and updating of
 
    caches after pushes.
 

	
 
    Must always be called before anything with hooks are invoked.
 
    """
 
    from kallithea import CONFIG
 
    extras = {
 
        'ip': ip_addr, # used in log_push/pull_action action_logger
 
        'username': username,
 
        'action': action or 'push_local', # used in log_push_action_raw_ids action_logger
 
        'repository': repo_name,
 
        'scm': repo_alias, # used to pick hack in log_push_action_raw_ids
 
        'config': CONFIG['__file__'], # used by git hook to read config
 
    }
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from tg import tmpl_context
 
    if hasattr(tmpl_context, 'authuser'):
 
        return tmpl_context.authuser
 

	
 
    return None
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute. Example::
 

	
 
        def test(apiuser, userid=Optional(OAttr('apiuser')):
 
            user = Optional.extract(userid)
 
            # calls
 

	
 
    """
 

	
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
 
    def __repr__(self):
 
        return '<OptionalAttr:%s>' % self.attr_name
 

	
 
    def __call__(self):
 
        return self
 

	
 

	
 
# alias
 
OAttr = OptionalAttr
 

	
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 

	
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        if isinstance(self.type_, OAttr):
 
            # use params name
 
            return self.type_.attr_name
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        """
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in """`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
    for c in r"""`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print complaint
0 comments (0 inline, 0 general)