Changeset - 7e9d3865b4f9
[Not reviewed]
default
0 4 0
Mads Kiilerich - 6 years ago 2020-01-22 22:30:13
mads@kiilerich.com
Grafted from: ba85f7670991
py3: support new stdlib module names configparser, urllib and http lib

From "2to3 -f imports".
4 files changed with 11 insertions and 11 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/ldap_sync.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.ldap_sync
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
LDAP sync script
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 06, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import urllib2
 
import uuid
 
from ConfigParser import ConfigParser
 
from configparser import ConfigParser
 

	
 
import ldap
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import ascii_bytes
 

	
 

	
 
config = ConfigParser()
 
config.read('ldap_sync.conf')
 

	
 

	
 
class InvalidResponseIDError(Exception):
 
    """ Request and response don't have the same UUID. """
 

	
 

	
 
class ResponseError(Exception):
 
    """ Response has an error, something went wrong with request execution. """
 

	
 

	
 
class UserAlreadyInGroupError(Exception):
 
    """ User is already a member of the target group. """
 

	
 

	
 
class UserNotInGroupError(Exception):
 
    """ User is not a member of the target group. """
 

	
 

	
 
class API(object):
 

	
 
    def __init__(self, url, key):
 
        self.url = url
 
        self.key = key
 

	
 
    def get_api_data(self, uid, method, args):
 
        """Prepare dict for API post."""
 
        return {
 
            "id": uid,
 
            "api_key": self.key,
 
            "method": method,
 
            "args": args
 
        }
 

	
 
    def post(self, method, args):
 
        """Send a generic API post to Kallithea.
 

	
 
        This will generate the UUID for validation check after the
 
        response is returned. Handle errors and get the result back.
 
        """
 
        uid = str(uuid.uuid1())
 
        data = self.get_api_data(uid, method, args)
 

	
 
        data = ascii_bytes(ext_json.dumps(data))
 
        headers = {'content-type': 'text/plain'}
 
        req = urllib2.Request(self.url, data, headers)
 

	
 
        response = urllib2.urlopen(req)
 
        response = ext_json.load(response)
 

	
 
        if uid != response["id"]:
 
            raise InvalidResponseIDError("UUID does not match.")
 

	
 
        if response["error"] is not None:
 
            raise ResponseError(response["error"])
 

	
 
        return response["result"]
 

	
 
    def create_group(self, name, active=True):
 
        """Create the Kallithea user group."""
 
        args = {
 
            "group_name": name,
 
            "active": str(active)
 
        }
 
        self.post("create_user_group", args)
 

	
 
    def add_membership(self, group, username):
 
        """Add specific user to a group."""
 
        args = {
 
            "usersgroupid": group,
 
            "userid": username
 
        }
 
        result = self.post("add_user_to_user_group", args)
 
        if not result["success"]:
 
            raise UserAlreadyInGroupError("User %s already in group %s." %
 
                                          (username, group))
 

	
 
    def remove_membership(self, group, username):
 
        """Remove specific user from a group."""
 
        args = {
 
            "usersgroupid": group,
 
            "userid": username
 
        }
 
        result = self.post("remove_user_from_user_group", args)
 
        if not result["success"]:
 
            raise UserNotInGroupError("User %s not in group %s." %
 
                                      (username, group))
 

	
 
    def get_group_members(self, name):
 
        """Get the list of member usernames from a user group."""
 
        args = {"usersgroupid": name}
 
        members = self.post("get_user_group", args)['members']
 
        member_list = []
 
        for member in members:
 
            member_list.append(member["username"])
 
        return member_list
 

	
 
    def get_group(self, name):
 
        """Return group info."""
 
        args = {"usersgroupid": name}
 
        return self.post("get_user_group", args)
 

	
 
    def get_user(self, username):
 
        """Return user info."""
 
        args = {"userid": username}
 
        return self.post("get_user", args)
 

	
 

	
 
class LdapClient(object):
 

	
 
    def __init__(self, uri, user, key, base_dn):
 
        self.client = ldap.initialize(uri, trace_level=0)
 
        self.client.set_option(ldap.OPT_REFERRALS, 0)
 
        self.client.simple_bind(user, key)
 
        self.base_dn = base_dn
 

	
 
    def close(self):
 
        self.client.unbind()
 

	
 
    def get_groups(self):
 
        """Get all the groups in form of dict {group_name: group_info,...}."""
 
        searchFilter = "objectClass=groupOfUniqueNames"
 
        result = self.client.search_s(self.base_dn, ldap.SCOPE_SUBTREE,
 
                                      searchFilter)
 

	
 
        groups = {}
 
        for group in result:
 
            groups[group[1]['cn'][0]] = group[1]
 

	
 
        return groups
 

	
 
    def get_group_users(self, groups, group):
 
        """Returns all the users belonging to a single group.
 

	
 
        Based on the list of groups and memberships, returns all the
 
        users belonging to a single group, searching recursively.
 
        """
 
        users = []
 
        for member in groups[group]["uniqueMember"]:
 
            member = self.parse_member_string(member)
 
            if member[0] == "uid":
 
                users.append(member[1])
 
            elif member[0] == "cn":
 
                users += self.get_group_users(groups, member[1])
 

	
 
        return users
 

	
 
    def parse_member_string(self, member):
 
        """Parses the member string and returns a touple of type and name.
 

	
 
        Unique member can be either user or group. Users will have 'uid' as
 
        prefix while groups will have 'cn'.
 
        """
 
        member = member.split(",")[0]
 
        return member.split('=')
 

	
 

	
 
class LdapSync(object):
 

	
 
    def __init__(self):
 
        self.ldap_client = LdapClient(config.get("default", "ldap_uri"),
 
                                      config.get("default", "ldap_user"),
 
                                      config.get("default", "ldap_key"),
 
                                      config.get("default", "base_dn"))
 
        self.kallithea_api = API(config.get("default", "api_url"),
 
                                 config.get("default", "api_key"))
 

	
 
    def update_groups_from_ldap(self):
 
        """Add all the groups from LDAP to Kallithea."""
 
        added = existing = 0
 
        groups = self.ldap_client.get_groups()
 
        for group in groups:
 
            try:
 
                self.kallithea_api.create_group(group)
 
                added += 1
 
            except Exception:
 
                existing += 1
 

	
 
        return added, existing
 

	
 
    def update_memberships_from_ldap(self, group):
 
        """Update memberships based on the LDAP groups."""
 
        groups = self.ldap_client.get_groups()
 
        group_users = self.ldap_client.get_group_users(groups, group)
kallithea/lib/helpers.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import hashlib
 
import json
 
import logging
 
import random
 
import re
 
import textwrap
 
import urlparse
 
import urllib.parse
 

	
 
from beaker.cache import cache_region
 
from pygments import highlight as code_highlight
 
from pygments.formatters.html import HtmlFormatter
 
from tg.i18n import ugettext as _
 
from webhelpers2.html import HTML, escape, literal
 
from webhelpers2.html.tags import NotGiven, Option, Options, _input, _make_safe_id_component, checkbox, end_form
 
from webhelpers2.html.tags import form as insecure_form
 
from webhelpers2.html.tags import hidden, link_to, password, radio
 
from webhelpers2.html.tags import select as webhelpers2_select
 
from webhelpers2.html.tags import submit, text, textarea
 
from webhelpers2.number import format_byte_size
 
from webhelpers2.text import chop_at, truncate, wrap_paragraphs
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib.annotate import annotate_highlight
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import MENTIONS_REGEX, AttributeDict
 
from kallithea.lib.utils2 import age as _age
 
from kallithea.lib.utils2 import credentials_filter, safe_bytes, safe_int, safe_unicode, str2bool, time_to_datetime
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_email, author_name
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def canonical_url(*args, **kargs):
 
    '''Like url(x, qualified=True), but returns url that not only is qualified
 
    but also canonical, as configured in canonical_url'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        kargs['host'] = parts[1]
 
        kargs['protocol'] = parts[0]
 
    except IndexError:
 
        kargs['qualified'] = True
 
    return url(*args, **kargs)
 

	
 

	
 
def canonical_hostname():
 
    '''Return canonical hostname of system'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        return parts[1].split('/', 1)[0]
 
    except IndexError:
 
        parts = url('home', qualified=True).split('://', 1)
 
        return parts[1].split('/', 1)[0]
 

	
 

	
 
def html_escape(s):
 
    """Return string with all html escaped.
 
    This is also safe for javascript in html but not necessarily correct.
 
    """
 
    return (s
 
        .replace('&', '&amp;')
 
        .replace(">", "&gt;")
 
        .replace("<", "&lt;")
 
        .replace('"', "&quot;")
 
        .replace("'", "&apos;") # Note: this is HTML5 not HTML4 and might not work in mails
 
        )
 

	
 
def js(value):
 
    """Convert Python value to the corresponding JavaScript representation.
 

	
 
    This is necessary to safely insert arbitrary values into HTML <script>
 
    sections e.g. using Mako template expression substitution.
 

	
 
    Note: Rather than using this function, it's preferable to avoid the
 
    insertion of values into HTML <script> sections altogether. Instead,
 
    data should (to the extent possible) be passed to JavaScript using
 
    data attributes or AJAX calls, eliminating the need for JS specific
 
    escaping.
 

	
 
    Note: This is not safe for use in attributes (e.g. onclick), because
 
    quotes are not escaped.
 

	
 
    Because the rules for parsing <script> varies between XHTML (where
 
    normal rules apply for any special characters) and HTML (where
 
    entities are not interpreted, but the literal string "</script>"
 
    is forbidden), the function ensures that the result never contains
 
    '&', '<' and '>', thus making it safe in both those contexts (but
 
    not in attributes).
 
    """
 
    return literal(
 
        ('(' + json.dumps(value) + ')')
 
        # In JSON, the following can only appear in string literals.
 
        .replace('&', r'\x26')
 
        .replace('<', r'\x3c')
 
        .replace('>', r'\x3e')
 
    )
 

	
 

	
 
def jshtml(val):
 
    """HTML escapes a string value, then converts the resulting string
 
    to its corresponding JavaScript representation (see `js`).
 

	
 
    This is used when a plain-text string (possibly containing special
 
    HTML characters) will be used by a script in an HTML context (e.g.
 
    element.innerHTML or jQuery's 'html' method).
 

	
 
    If in doubt, err on the side of using `jshtml` over `js`, since it's
 
    better to escape too much than too little.
 
    """
 
    return js(escape(val))
 

	
 

	
 
def shorter(s, size=20, firstline=False, postfix='...'):
 
    """Truncate s to size, including the postfix string if truncating.
 
    If firstline, truncate at newline.
 
    """
 
    if firstline:
 
        s = s.split('\n', 1)[0].rstrip()
 
    if len(s) > size:
 
        return s[:size - len(postfix)] + postfix
 
    return s
 

	
 

	
 
def reset(name, value, id=NotGiven, **attrs):
 
    """Create a reset button, similar to webhelpers2.html.tags.submit ."""
 
    return _input("reset", name, value, id, attrs)
 

	
 

	
 
def select(name, selected_values, options, id=NotGiven, **attrs):
 
    """Convenient wrapper of webhelpers2 to let it accept options as a tuple list"""
 
    if isinstance(options, list):
 
        option_list = options
 
        # Handle old value,label lists ... where value also can be value,label lists
 
        options = Options()
 
        for x in option_list:
 
            if isinstance(x, tuple) and len(x) == 2:
 
                value, label = x
 
            elif isinstance(x, str):
 
                value = label = x
 
            else:
 
                log.error('invalid select option %r', x)
 
                raise
 
            if isinstance(value, list):
 
                og = options.add_optgroup(label)
 
                for x in value:
 
                    if isinstance(x, tuple) and len(x) == 2:
 
                        group_value, group_label = x
 
                    elif isinstance(x, str):
 
                        group_value = group_label = x
 
                    else:
 
                        log.error('invalid select option %r', x)
 
                        raise
 
                    og.add_option(group_label, group_value)
 
            else:
 
                options.add_option(label, value)
 
    return webhelpers2_select(name, selected_values, options, id=id, **attrs)
 

	
 

	
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a unique ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), hashlib.md5(safe_bytes(path)).hexdigest()[:12])
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        if isinstance(paths, str):
 
            paths = safe_unicode(paths)
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
@@ -743,385 +743,385 @@ def action_parser(user_log, feed=False, 
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        url_ = url('summary_home', repo_name=repo_name)
 
        return _('Fork name %s') % link_to(action_params, url_)
 

	
 
    def get_user_name():
 
        user_name = action_params
 
        return user_name
 

	
 
    def get_users_group():
 
        group_name = action_params
 
        return group_name
 

	
 
    def get_pull_request():
 
        from kallithea.model.db import PullRequest
 
        pull_request_id = action_params
 
        nice_id = PullRequest.make_nice_id(pull_request_id)
 

	
 
        deleted = user_log.repository is None
 
        if deleted:
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 

	
 
        return link_to(_('Pull request %s') % nice_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    def get_archive_name():
 
        archive_name = action_params
 
        return archive_name
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
        'user_deleted_repo':           (_('[deleted] repository'),
 
                                        None, 'icon-trashcan'),
 
        'user_created_repo':           (_('[created] repository'),
 
                                        None, 'icon-plus'),
 
        'user_created_fork':           (_('[created] repository as fork'),
 
                                        None, 'icon-fork'),
 
        'user_forked_repo':            (_('[forked] repository'),
 
                                        get_fork_name, 'icon-fork'),
 
        'user_updated_repo':           (_('[updated] repository'),
 
                                        None, 'icon-pencil'),
 
        'user_downloaded_archive':      (_('[downloaded] archive from repository'),
 
                                        get_archive_name, 'icon-download-cloud'),
 
        'admin_deleted_repo':          (_('[delete] repository'),
 
                                        None, 'icon-trashcan'),
 
        'admin_created_repo':          (_('[created] repository'),
 
                                        None, 'icon-plus'),
 
        'admin_forked_repo':           (_('[forked] repository'),
 
                                        None, 'icon-fork'),
 
        'admin_updated_repo':          (_('[updated] repository'),
 
                                        None, 'icon-pencil'),
 
        'admin_created_user':          (_('[created] user'),
 
                                        get_user_name, 'icon-user'),
 
        'admin_updated_user':          (_('[updated] user'),
 
                                        get_user_name, 'icon-user'),
 
        'admin_created_users_group':   (_('[created] user group'),
 
                                        get_users_group, 'icon-pencil'),
 
        'admin_updated_users_group':   (_('[updated] user group'),
 
                                        get_users_group, 'icon-pencil'),
 
        'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                        get_cs_links, 'icon-comment'),
 
        'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                        get_pull_request, 'icon-comment'),
 
        'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                        get_pull_request, 'icon-ok'),
 
        'push':                        (_('[pushed] into'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                        get_cs_links, 'icon-pencil'),
 
        'push_remote':                 (_('[pulled from remote] into repository'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'pull':                        (_('[pulled] from'),
 
                                        None, 'icon-move-down'),
 
        'started_following_repo':      (_('[started following] repository'),
 
                                        None, 'icon-heart'),
 
        'stopped_following_repo':      (_('[stopped following] repository'),
 
                                        None, 'icon-heart-empty'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0] \
 
            .replace('[', '<b>') \
 
            .replace(']', '</b>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        html = """<i class="%s"></i>""" % ico
 
        return literal(html)
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a span around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
    class is 'gravatar'.
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 
    if 'div_class' not in div_attributes:
 
        div_attributes['div_class'] = "gravatar"
 
    attributes = []
 
    for k, v in sorted(div_attributes.items()):
 
        assert k.startswith('div_'), k
 
        attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
 
    return literal("""<span%s>%s</span>""" %
 
                   (''.join(attributes),
 
                    gravatar(email_address, cls=cls, size=size)))
 

	
 

	
 
def gravatar(email_address, cls='', size=30):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 

	
 
    src = gravatar_url(email_address, size * 2)
 

	
 
    if src:
 
        # here it makes sense to use style="width: ..." (instead of, say, a
 
        # stylesheet) because we using this to generate a high-res (retina) size
 
        html = ('<i class="icon-gravatar {cls}"'
 
                ' style="font-size: {size}px;background-size: {size}px;background-image: url(\'{src}\')"'
 
                '></i>').format(cls=cls, size=size, src=src)
 

	
 
    else:
 
        # if src is empty then there was no gravatar, so we use a font icon
 
        html = ("""<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 
            .format(cls=cls, size=size, src=src))
 

	
 
    return literal(html)
 

	
 

	
 
def gravatar_url(email_address, size=30, default=''):
 
    # doh, we need to re-import those to mock it later
 
    from kallithea.config.routing import url
 
    from kallithea.model.db import User
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ""
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    email_address = email_address or _def
 

	
 
    if email_address == _def:
 
        return default
 

	
 
    parsed_url = urlparse.urlparse(url.current(qualified=True))
 
    parsed_url = urllib.parse.urlparse(url.current(qualified=True))
 
    url = (c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL) \
 
               .replace('{email}', email_address) \
 
               .replace('{md5email}', hashlib.md5(safe_bytes(email_address).lower()).hexdigest()) \
 
               .replace('{netloc}', parsed_url.netloc) \
 
               .replace('{scheme}', parsed_url.scheme) \
 
               .replace('{size}', str(size))
 
    return url
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([safe_unicode(x.path)
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No files')
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
 
    lines of code on file
 

	
 
    :param stats: two element list of added/deleted lines of code
 
    """
 
    from kallithea.lib.diffs import NEW_FILENODE, DEL_FILENODE, \
 
        MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE
 

	
 
    a, d = stats['added'], stats['deleted']
 
    width = 100
 

	
 
    if stats['binary']:
 
        # binary mode
 
        lbl = ''
 
        bin_op = 1
 

	
 
        if BIN_FILENODE in stats['ops']:
 
            lbl = 'bin+'
 

	
 
        if NEW_FILENODE in stats['ops']:
 
            lbl += _('new file')
 
            bin_op = NEW_FILENODE
 
        elif MOD_FILENODE in stats['ops']:
 
            lbl += _('mod')
 
            bin_op = MOD_FILENODE
 
        elif DEL_FILENODE in stats['ops']:
 
            lbl += _('del')
 
            bin_op = DEL_FILENODE
 
        elif RENAMED_FILENODE in stats['ops']:
 
            lbl += _('rename')
 
            bin_op = RENAMED_FILENODE
 

	
 
        # chmod can go with other operations
 
        if CHMOD_FILENODE in stats['ops']:
 
            _org_lbl = _('chmod')
 
            lbl += _org_lbl if lbl.endswith('+') else '+%s' % _org_lbl
 

	
 
        #import ipdb;ipdb.set_trace()
 
        b_d = '<div class="bin bin%s progress-bar" style="width:100%%">%s</div>' % (bin_op, lbl)
 
        b_a = '<div class="bin bin1" style="width:0%"></div>'
 
        return literal('<div style="width:%spx" class="progress">%s%s</div>' % (width, b_a, b_d))
 

	
 
    t = stats['added'] + stats['deleted']
 
    unit = float(width) / (t or 1)
 

	
 
    # needs > 9% of width to be visible or 0 to be hidden
 
    a_p = max(9, unit * a) if a > 0 else 0
 
    d_p = max(9, unit * d) if d > 0 else 0
 
    p_sum = a_p + d_p
 

	
 
    if p_sum > width:
 
        # adjust the percentage to be == 100% since we adjusted to 9
 
        if a_p > d_p:
 
            a_p = a_p - (p_sum - width)
 
        else:
 
            d_p = d_p - (p_sum - width)
 

	
 
    a_v = a if a > 0 else ''
 
    d_v = d if d > 0 else ''
 

	
 
    d_a = '<div class="added progress-bar" style="width:%s%%">%s</div>' % (
 
        a_p, a_v
 
    )
 
    d_d = '<div class="deleted progress-bar" style="width:%s%%">%s</div>' % (
 
        d_p, d_v
 
    )
 
    return literal('<div class="progress" style="width:%spx">%s%s</div>' % (width, d_a, d_d))
 

	
 

	
 
_URLIFY_RE = re.compile(r'''
 
# URL markup
 
(?P<url>%s) |
 
# @mention markup
 
(?P<mention>%s) |
 
# Changeset hash markup
 
(?<!\w|[-_])
 
  (?P<hash>[0-9a-f]{12,40})
 
(?!\w|[-_]) |
 
# Markup of *bold text*
 
(?:
 
  (?:^|(?<=\s))
 
  (?P<bold> [*] (?!\s) [^*\n]* (?<!\s) [*] )
 
  (?![*\w])
 
) |
 
# "Stylize" markup
 
\[see\ \=&gt;\ *(?P<seen>[a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\] |
 
\[license\ \=&gt;\ *(?P<license>[a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\] |
 
\[(?P<tagtype>requires|recommends|conflicts|base)\ \=&gt;\ *(?P<tagvalue>[a-zA-Z0-9\-\/]*)\] |
 
\[(?:lang|language)\ \=&gt;\ *(?P<lang>[a-zA-Z\-\/\#\+]*)\] |
 
\[(?P<tag>[a-z]+)\]
 
''' % (url_re.pattern, MENTIONS_REGEX.pattern),
 
    re.VERBOSE | re.MULTILINE | re.IGNORECASE)
 

	
 

	
 
def urlify_text(s, repo_name=None, link_=None, truncate=None, stylize=False, truncatef=truncate):
 
    """
 
    Parses given text message and make literal html with markup.
 
    The text will be truncated to the specified length.
 
    Hashes are turned into changeset links to specified repository.
 
    URLs links to what they say.
 
    Issues are linked to given issue-server.
 
    If link_ is provided, all text not already linking somewhere will link there.
 
    """
 

	
 
    def _replace(match_obj):
 
        url = match_obj.group('url')
 
        if url is not None:
 
            return '<a href="%(url)s">%(url)s</a>' % {'url': url}
 
        mention = match_obj.group('mention')
 
        if mention is not None:
 
            return '<b>%s</b>' % mention
 
        hash_ = match_obj.group('hash')
 
        if hash_ is not None and repo_name is not None:
 
            from kallithea.config.routing import url  # doh, we need to re-import url to mock it later
 
            return '<a class="changeset_hash" href="%(url)s">%(hash)s</a>' % {
 
                 'url': url('changeset_home', repo_name=repo_name, revision=hash_),
 
                 'hash': hash_,
 
                }
 
        bold = match_obj.group('bold')
 
        if bold is not None:
 
            return '<b>*%s*</b>' % _urlify(bold[1:-1])
 
        if stylize:
 
            seen = match_obj.group('seen')
 
            if seen:
 
                return '<div class="label label-meta" data-tag="see">see =&gt; %s</div>' % seen
 
            license = match_obj.group('license')
 
            if license:
 
                return '<div class="label label-meta" data-tag="license"><a href="http://www.opensource.org/licenses/%s">%s</a></div>' % (license, license)
 
            tagtype = match_obj.group('tagtype')
 
            if tagtype:
 
                tagvalue = match_obj.group('tagvalue')
 
                return '<div class="label label-meta" data-tag="%s">%s =&gt; <a href="/%s">%s</a></div>' % (tagtype, tagtype, tagvalue, tagvalue)
 
            lang = match_obj.group('lang')
 
            if lang:
 
                return '<div class="label label-meta" data-tag="lang">%s</div>' % lang
 
            tag = match_obj.group('tag')
 
            if tag:
 
                return '<div class="label label-meta" data-tag="%s">%s</div>' % (tag, tag)
 
        return match_obj.group(0)
 

	
 
    def _urlify(s):
 
        """
 
        Extract urls from text and make html links out of them
 
        """
 
        return _URLIFY_RE.sub(_replace, s)
 

	
 
    if truncate is None:
 
        s = s.rstrip()
 
    else:
 
        s = truncatef(s, truncate, whole_word=True)
 
    s = html_escape(s)
 
    s = _urlify(s)
 
    if repo_name is not None:
 
        s = urlify_issues(s, repo_name)
 
    if link_ is not None:
 
        # make href around everything that isn't a href already
 
        s = linkify_others(s, link_)
 
    s = s.replace('\r\n', '<br/>').replace('\n', '<br/>')
 
    # Turn HTML5 into more valid HTML4 as required by some mail readers.
 
    # (This is not done in one step in html_escape, because character codes like
 
    # &#123; risk to be seen as an issue reference due to the presence of '#'.)
 
    s = s.replace("&apos;", "&#39;")
 
    return literal(s)
kallithea/tests/functional/test_login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import re
 
import time
 
import urlparse
 
import urllib.parse
 

	
 
import mock
 
from tg.util.webtest import test_context
 

	
 
import kallithea.lib.celerylib.tasks
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import check_password
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.model import validators
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.db import User
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(base.TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(base.url(controller='login', action='index'))
 
        assert response.status == '200 OK'
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, base.TEST_USER_ADMIN_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % base.HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, base.TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % base.HG_REPO)
 

	
 
    def test_login_regular_email_ok(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_EMAIL,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, base.TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % base.HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '302 Found'
 
        response = response.follow()
 

	
 
        assert response.status == '200 OK'
 
        response.mustcontain('Users Administration')
 

	
 
    def test_login_do_not_remember(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  'remember': False,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            assert not re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r has expiration date, but should be a session cookie' % cookie
 

	
 
    def test_login_remember(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  'remember': True,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            assert re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r should have expiration date, but is a session cookie' % cookie
 

	
 
    def test_logout(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(base.url(controller='login', action='index'))
 
        response = response.follow()
 
        assert 'authuser' in response.session
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(base.url(controller='login', action='index'))
 
        assert 'authuser' not in response.session
 

	
 
    @base.parametrize('url_came_from', [
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://ftp.example.com',),
 
          ('http://other.example.com/bl%C3%A5b%C3%A6rgr%C3%B8d',),
 
          ('//evil.example.com/',),
 
          ('/\r\nX-Header-Injection: boo',),
 
          ('/invälid_url_bytes',),
 
          ('non-absolute-path',),
 
    ])
 
    def test_login_bad_came_froms(self, url_came_from):
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=url_came_from),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()},
 
                                 status=400)
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'as',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '200 OK'
 

	
 
        response.mustcontain('Enter 3 characters or more')
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': 'error',
 
                                  'password': 'test12',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('Invalid username or password')
 

	
 
    def test_login_non_ascii(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': 'blåbærgrød',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('>Invalid username or password<')
 

	
 
    # verify that get arguments are correctly passed along login redirection
 

	
 
    @base.parametrize('args', [
 
        {'foo':'one', 'bar':'two'},
 
        {'blue': u'blå', 'green': u'grøn'},
 
    ])
 
    def test_redirection_to_login_form_preserves_get_args(self, args):
 
        with fixture.anon_access(False):
 
            response = self.app.get(base.url(controller='summary', action='index',
 
                                        repo_name=base.HG_REPO,
 
                                        **args))
 
            assert response.status == '302 Found'
 
            came_from = urlparse.parse_qs(urlparse.urlparse(response.location).query)['came_from'][0]
 
            came_from_qs = urlparse.parse_qsl(urlparse.urlparse(came_from).query)
 
            came_from = urllib.parse.parse_qs(urllib.parse.urlparse(response.location).query)['came_from'][0]
 
            came_from_qs = urllib.parse.parse_qsl(urllib.parse.urlparse(came_from).query)
 
            assert sorted(came_from_qs) == sorted((k, v.encode('utf-8')) for k, v in args.items())
 

	
 
    @base.parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_preserves_get_args(self, args, args_encoded):
 
        response = self.app.get(base.url(controller='login', action='index',
 
                                    came_from=base.url('/_admin/users', **args)))
 
        came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
 
        came_from = urllib.parse.parse_qs(urllib.parse.urlparse(response.form.action).query)['came_from'][0]
 
        for encoded in args_encoded:
 
            assert encoded in came_from
 

	
 
    @base.parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=base.url('/_admin/users', **args)),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '302 Found'
 
        for encoded in args_encoded:
 
            assert encoded in response.location
 

	
 
    @base.parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=base.url('/_admin/users', **args)),
 
                                 {'username': 'error',
 
                                  'password': 'test12',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('Invalid username or password')
 
        came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
 
        came_from = urllib.parse.parse_qs(urllib.parse.urlparse(response.form.action).query)['came_from'][0]
 
        for encoded in args_encoded:
 
            assert encoded in came_from
 

	
 
    #==========================================================================
 
    # REGISTRATIONS
 
    #==========================================================================
 
    def test_register(self):
 
        response = self.app.get(base.url(controller='login', action='register'))
 
        response.mustcontain('Sign Up')
 

	
 
    def test_register_err_same_username(self):
 
        uname = base.TEST_USER_ADMIN_LOGIN
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': uname,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmail@example.com',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': uname})
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'test_admin_0',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': base.TEST_USER_ADMIN_EMAIL,
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email_case_sensitive(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'test_admin_1',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': base.TEST_USER_ADMIN_EMAIL.title(),
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        with test_context(self.app):
 
            msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': 'test',
 
                                             'password_confirmation': 'test',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '200 OK'
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Enter a value 6 characters long or more')
 

	
 
    def test_register_err_username(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'error user',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Username may only contain '
 
                'alphanumeric characters underscores, '
 
                'periods or dashes and must begin with an '
 
                'alphanumeric character')
 

	
 
    def test_register_err_case_sensitive(self):
 
        usr = base.TEST_USER_ADMIN_LOGIN.title()
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': usr,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        with test_context(self.app):
 
            msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': usr})
 
        response.mustcontain(msg)
 

	
 
    def test_register_special_chars(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                        {'username': 'xxxaxn',
 
                                         'password': 'ąćźżąśśśś',
 
                                         'password_confirmation': 'ąćźżąśśśś',
 
                                         'email': 'goodmailm@test.plx',
 
                                         'firstname': 'test',
 
                                         'lastname': 'test',
 
                                         '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.ValidPassword()._messages['invalid_password']
 
        response.mustcontain(msg)
 

	
 
    def test_register_password_mismatch(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': '123qwe',
 
                                             'password_confirmation': 'qwe123',
 
                                             'email': 'goodmailm@test.plxa',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        with test_context(self.app):
 
            msg = validators.ValidPasswordsMatch('password', 'password_confirmation')._messages['password_mismatch']
 
        response.mustcontain(msg)
 

	
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'user4@example.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': username,
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'firstname': name,
 
                                             'lastname': lastname,
 
                                             'admin': True,
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})  # This should be overridden
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'You have successfully registered with Kallithea')
 

	
 
        ret = Session().query(User).filter(User.username == 'test_regular4').one()
 
        assert ret.username == username
 
        assert check_password(password, ret.password) == True
 
        assert ret.email == email
 
        assert ret.name == name
 
        assert ret.lastname == lastname
 
        assert ret.api_key is not None
 
        assert ret.admin == False
 

	
 
    #==========================================================================
 
    # PASSWORD RESET
 
    #==========================================================================
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        bad_email = 'username%wrongmail.org'
 
        response = self.app.post(
 
                        base.url(controller='login', action='password_reset'),
 
                            {'email': bad_email,
 
                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('An email address must contain a single @')
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(base.url(controller='login',
 
                                    action='password_reset'))
 
        assert response.status == '200 OK'
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'username@example.com'
 
        name = u'passwd'
 
        lastname = u'reset'
 
        timestamp = int(time.time())
 

	
 
        new = User()
 
        new.username = username
 
        new.password = password
 
        new.email = email
 
        new.name = name
 
        new.lastname = lastname
 
        new.api_key = generate_api_key()
 
        Session().add(new)
 
        Session().commit()
 

	
 
        token = UserModel().get_reset_password_token(
 
            User.get_by_username(username), timestamp, self.session_csrf_secret_token())
 

	
 
        collected = []
 
        def mock_send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            collected.append((recipients, subject, body, html_body))
 

	
 
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', mock_send_email), \
 
                mock.patch.object(time, 'time', lambda: timestamp):
 
            response = self.app.post(base.url(controller='login',
 
                                         action='password_reset'),
 
                                     {'email': email,
 
                                      '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        self.checkSessionFlash(response, 'A password reset confirmation code has been sent')
 

	
 
        ((recipients, subject, body, html_body),) = collected
 
        assert recipients == ['username@example.com']
 
        assert subject == 'Password reset link'
 
        assert '\n%s\n' % token in body
 
        (confirmation_url,) = (line for line in body.splitlines() if line.startswith('http://'))
 
        assert ' href="%s"' % confirmation_url.replace('&', '&amp;').replace('@', '%40') in html_body
 

	
 
        d = urlparse.parse_qs(urlparse.urlparse(confirmation_url).query)
 
        d = urllib.parse.parse_qs(urllib.parse.urlparse(confirmation_url).query)
 
        assert d['token'] == [token]
 
        assert d['timestamp'] == [str(timestamp)]
 
        assert d['email'] == [email]
 

	
 
        response = response.follow()
 

	
 
        # BAD TOKEN
 

	
 
        bad_token = "bad"
 

	
 
        response = self.app.post(base.url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': bad_token,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 })
 
        assert response.status == '200 OK'
 
        response.mustcontain('Invalid password reset token')
 

	
 
        # GOOD TOKEN
 

	
 
        response = self.app.get(confirmation_url)
 
        assert response.status == '200 OK'
 
        response.mustcontain("You are about to set a new password for the email address %s" % email)
 
        response.mustcontain('<form action="%s" method="post">' % base.url(controller='login', action='password_reset_confirmation'))
 
        response.mustcontain('value="%s"' % self.session_csrf_secret_token())
 
        response.mustcontain('value="%s"' % token)
 
        response.mustcontain('value="%s"' % timestamp)
 
        response.mustcontain('value="username@example.com"')
 

	
 
        # fake a submit of that form
 
        response = self.app.post(base.url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 })
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'Successfully updated password')
 

	
 
        response = response.follow()
 

	
 
    #==========================================================================
 
    # API
 
    #==========================================================================
 

	
 
    def _api_key_test(self, api_key, status):
 
        """Verifies HTTP status code for accessing an auth-requiring page,
 
        using the given api_key URL parameter as well as using the API key
 
        with bearer authentication.
 

	
 
        If api_key is None, no api_key is passed at all. If api_key is True,
 
        a real, working API key is used.
 
        """
 
        with fixture.anon_access(False):
 
            if api_key is None:
 
                params = {}
 
                headers = {}
 
            else:
 
                if api_key is True:
 
                    api_key = User.get_first_admin().api_key
 
                params = {'api_key': api_key}
 
                headers = {'Authorization': 'Bearer ' + str(api_key)}
 

	
 
            self.app.get(base.url(controller='changeset', action='changeset_raw',
 
                             repo_name=base.HG_REPO, revision='tip', **params),
 
                         status=status)
 

	
 
            self.app.get(base.url(controller='changeset', action='changeset_raw',
 
                             repo_name=base.HG_REPO, revision='tip'),
 
                         headers=headers,
 
                         status=status)
 

	
 
    @base.parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('fake_not_alnum', 'a-z', 403),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 403),
 
        ('proper_api_key', True, 200)
 
    ])
 
    def test_access_page_via_api_key(self, test_name, api_key, code):
 
        self._api_key_test(api_key, code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        new_api_key = ApiKeyModel().create(base.TEST_USER_ADMIN_LOGIN, u'test')
 
        Session().commit()
 
        self._api_key_test(new_api_key.api_key, status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        new_api_key = ApiKeyModel().create(base.TEST_USER_ADMIN_LOGIN, u'test')
 
        Session().commit()
 
        # patch the API key and make it expired
 
        new_api_key.expires = 0
 
        Session().commit()
 
        self._api_key_test(new_api_key.api_key, status=403)
kallithea/tests/scripts/manual_test_crawler.py
Show inline comments
 
#!/usr/bin/env python3
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.scripts.manual_test_crawler
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Test for crawling a project for memory usage
 
This should be runned just as regular script together
 
with a watch script that will show memory usage.
 

	
 
watch -n1 ./kallithea/tests/mem_watch
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import cookielib
 
import http.cookiejar
 
import os
 
import sys
 
import tempfile
 
import time
 
import urllib
 
import urllib2
 
from os.path import dirname
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.vcs.exceptions import RepositoryError
 

	
 

	
 
__here__ = os.path.abspath(__file__)
 
__root__ = dirname(dirname(dirname(__here__)))
 
sys.path.append(__root__)
 

	
 

	
 
PASES = 3
 
HOST = 'http://127.0.0.1'
 
PORT = 5000
 
BASE_URI = '%s:%s/' % (HOST, PORT)
 

	
 
if len(sys.argv) == 2:
 
    BASE_URI = sys.argv[1]
 

	
 
if not BASE_URI.endswith('/'):
 
    BASE_URI += '/'
 

	
 
print('Crawling @ %s' % BASE_URI)
 
BASE_URI += '%s'
 
PROJECT_PATH = os.path.join('/', 'home', 'username', 'repos')
 
PROJECTS = [
 
    # 'linux-magx-pbranch',
 
    'CPython',
 
    'kallithea',
 
]
 

	
 

	
 
cj = cookielib.FileCookieJar(os.path.join(tempfile.gettempdir(), 'rc_test_cookie.txt'))
 
cj = http.cookiejar.FileCookieJar(os.path.join(tempfile.gettempdir(), 'rc_test_cookie.txt'))
 
o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
 
o.addheaders = [
 
    ('User-agent', 'kallithea-crawler'),
 
    ('Accept-Language', 'en - us, en;q = 0.5')
 
]
 

	
 
urllib2.install_opener(o)
 

	
 

	
 
def _get_repo(proj):
 
    if isinstance(proj, str):
 
        repo = vcs.get_repo(os.path.join(PROJECT_PATH, proj))
 
        proj = proj
 
    else:
 
        repo = proj
 
        proj = repo.name
 

	
 
    return repo, proj
 

	
 

	
 
def test_changelog_walk(proj, pages=100):
 
    repo, proj = _get_repo(proj)
 

	
 
    total_time = 0
 
    for i in range(1, pages):
 

	
 
        page = '/'.join((proj, 'changelog',))
 

	
 
        full_uri = (BASE_URI % page) + '?' + urllib.urlencode({'page': i})
 
        s = time.time()
 
        f = o.open(full_uri)
 

	
 
        assert f.url == full_uri, 'URL:%s does not match %s' % (f.url, full_uri)
 

	
 
        size = len(f.read())
 
        e = time.time() - s
 
        total_time += e
 
        print('visited %s size:%s req:%s ms' % (full_uri, size, e))
 

	
 
    print('total_time', total_time)
 
    print('average on req', total_time / float(pages))
 

	
 

	
 
def test_changeset_walk(proj, limit=None):
 
    repo, proj = _get_repo(proj)
 

	
 
    print('processing', os.path.join(PROJECT_PATH, proj))
 
    total_time = 0
 

	
 
    cnt = 0
 
    for i in repo:
 
        cnt += 1
 
        raw_cs = '/'.join((proj, 'changeset', i.raw_id))
 
        if limit and limit == cnt:
 
            break
 

	
 
        full_uri = (BASE_URI % raw_cs)
 
        print('%s visiting %s/%s' % (cnt, full_uri, i))
 
        s = time.time()
 
        f = o.open(full_uri)
 
        size = len(f.read())
 
        e = time.time() - s
 
        total_time += e
 
        print('%s visited %s/%s size:%s req:%s ms' % (cnt, full_uri, i, size, e))
 

	
 
    print('total_time', total_time)
 
    print('average on req', total_time / float(cnt))
 

	
 

	
 
def test_files_walk(proj, limit=100):
 
    repo, proj = _get_repo(proj)
 

	
 
    print('processing', os.path.join(PROJECT_PATH, proj))
 
    total_time = 0
 

	
 
    paths_ = OrderedSet([''])
 
    try:
 
        tip = repo.get_changeset('tip')
 
        for topnode, dirs, files in tip.walk('/'):
 

	
 
            for dir in dirs:
 
                paths_.add(dir.path)
 
                for f in dir:
 
                    paths_.add(f.path)
 

	
 
            for f in files:
 
                paths_.add(f.path)
 

	
 
    except RepositoryError as e:
 
        pass
 

	
 
    cnt = 0
 
    for f in paths_:
 
        cnt += 1
 
        if limit and limit == cnt:
 
            break
 

	
 
        file_path = '/'.join((proj, 'files', 'tip', f))
 
        full_uri = (BASE_URI % file_path)
 
        print('%s visiting %s' % (cnt, full_uri))
 
        s = time.time()
 
        f = o.open(full_uri)
 
        size = len(f.read())
 
        e = time.time() - s
 
        total_time += e
 
        print('%s visited OK size:%s req:%s ms' % (cnt, size, e))
 

	
 
    print('total_time', total_time)
 
    print('average on req', total_time / float(cnt))
 

	
 
if __name__ == '__main__':
 
    for path in PROJECTS:
 
        repo = vcs.get_repo(os.path.join(PROJECT_PATH, path))
 
        for i in range(PASES):
 
            print('PASS %s/%s' % (i, PASES))
 
            test_changelog_walk(repo, pages=80)
 
            test_changeset_walk(repo, limit=100)
 
            test_files_walk(repo, limit=100)
0 comments (0 inline, 0 general)