Changeset - 4f03bd5ac2f2
[Not reviewed]
default
0 11 0
Mads Kiilerich - 6 years ago 2019-12-24 04:13:48
mads@kiilerich.com
Grafted from: cd30f0fb8046
lib: handle both HTML, unsafe strings, and exceptions passed to helpers.flash()

Before, h.flash would trust any input to contain html ... and callers would
convert exceptions to string, often with a simple str() or unicode() ... which
really didn't deserve to be trusted.

Instead, only trust messages that have a __html__ and escape anything else ...
but also apply str/unicode on the parameter so the caller doesn't have to but
*can* pass an exception directly.
11 files changed with 33 insertions and 33 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/repos.py
Show inline comments
 
@@ -282,264 +282,264 @@ class ReposController(BaseRepoController
 
            Session().commit()
 
        except AttachedForksError:
 
            h.flash(_('Cannot delete repository %s which still has forks')
 
                        % repo_name, category='warning')
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of %s') % repo_name,
 
                    category='error')
 

	
 
        if repo.group:
 
            raise HTTPFound(location=url('repos_group_home', group_name=repo.group.group_name))
 
        raise HTTPFound(location=url('repos'))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit(self, repo_name):
 
        defaults = self.__load_data()
 
        c.repo_fields = RepositoryField.query() \
 
            .filter(RepositoryField.repository == c.repo_info).all()
 
        c.active = 'settings'
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_permissions(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.active = 'permissions'
 
        defaults = RepoModel()._get_defaults(repo_name)
 

	
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_permissions_update(self, repo_name):
 
        form = RepoPermsForm()().to_python(request.POST)
 
        RepoModel()._update_permissions(repo_name, form['perms_new'],
 
                                        form['perms_updates'])
 
        # TODO: implement this
 
        #action_logger(request.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, request.ip_addr)
 
        Session().commit()
 
        h.flash(_('Repository permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_repo_perms', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_permissions_revoke(self, repo_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 
            else:
 
                assert False
 

	
 
            if obj_type == 'user':
 
                RepoModel().revoke_user_permission(repo=repo_name, user=obj_id)
 
            elif obj_type == 'user_group':
 
                RepoModel().revoke_user_group_permission(
 
                    repo=repo_name, group_name=obj_id
 
                )
 
            else:
 
                assert False
 
            # TODO: implement this
 
            #action_logger(request.authuser, 'admin_revoked_repo_permissions',
 
            #              repo_name, request.ip_addr)
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
 
        return []
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_fields(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.repo_fields = RepositoryField.query() \
 
            .filter(RepositoryField.repository == c.repo_info).all()
 
        c.active = 'fields'
 
        if request.POST:
 

	
 
            raise HTTPFound(location=url('repo_edit_fields'))
 
        return render('admin/repos/repo_edit.html')
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def create_repo_field(self, repo_name):
 
        try:
 
            form_result = RepoFieldForm()().to_python(dict(request.POST))
 
            new_field = RepositoryField()
 
            new_field.repository = Repository.get_by_repo_name(repo_name)
 
            new_field.field_key = form_result['new_field_key']
 
            new_field.field_type = form_result['new_field_type']  # python type
 
            new_field.field_value = form_result['new_field_value']  # set initial blank value
 
            new_field.field_desc = form_result['new_field_desc']
 
            new_field.field_label = form_result['new_field_label']
 
            Session().add(new_field)
 
            Session().commit()
 
        except formencode.Invalid as e:
 
            h.flash(_('Field validation error: %s') % e.msg, category='error')
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during creation of field: %r') % e, category='error')
 
        raise HTTPFound(location=url('edit_repo_fields', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def delete_repo_field(self, repo_name, field_id):
 
        field = RepositoryField.get_or_404(field_id)
 
        try:
 
            Session().delete(field)
 
            Session().commit()
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            msg = _('An error occurred during removal of field')
 
            h.flash(msg, category='error')
 
        raise HTTPFound(location=url('edit_repo_fields', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_advanced(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.default_user_id = User.get_default_user().user_id
 
        c.in_public_journal = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == c.default_user_id) \
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        _repos = Repository.query(sorted=True).all()
 
        read_access_repos = RepoList(_repos, perm_level='read')
 
        c.repos_list = [(None, _('-- Not a fork --'))]
 
        c.repos_list += [(x.repo_id, x.repo_name)
 
                         for x in read_access_repos
 
                         if x.repo_id != c.repo_info.repo_id
 
                         and x.repo_type == c.repo_info.repo_type]
 

	
 
        defaults = {
 
            'id_fork_of': c.repo_info.fork_id if c.repo_info.fork_id else ''
 
        }
 

	
 
        c.active = 'advanced'
 
        if request.POST:
 
            raise HTTPFound(location=url('repo_edit_advanced'))
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_advanced_journal(self, repo_name):
 
        """
 
        Sets this repository to be visible in public journal,
 
        in other words asking default user to follow this repo
 

	
 
        :param repo_name:
 
        """
 

	
 
        try:
 
            repo_id = Repository.get_by_repo_name(repo_name).repo_id
 
            user_id = User.get_default_user().user_id
 
            self.scm_model.toggle_following_repo(repo_id, user_id)
 
            h.flash(_('Updated repository visibility in public journal'),
 
                    category='success')
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('An error occurred during setting this'
 
                      ' repository in public journal'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_repo_advanced', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_advanced_fork(self, repo_name):
 
        """
 
        Mark given repository as a fork of another
 

	
 
        :param repo_name:
 
        """
 
        try:
 
            fork_id = request.POST.get('id_fork_of')
 
            repo = ScmModel().mark_as_fork(repo_name, fork_id,
 
                                           request.authuser.username)
 
            fork = repo.fork.repo_name if repo.fork else _('Nothing')
 
            Session().commit()
 
            h.flash(_('Marked repository %s as fork of %s') % (repo_name, fork),
 
                    category='success')
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(str(e), category='error')
 
            h.flash(e, category='error')
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during this operation'),
 
                    category='error')
 

	
 
        raise HTTPFound(location=url('edit_repo_advanced', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_caches(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.active = 'caches'
 
        if request.POST:
 
            try:
 
                ScmModel().mark_for_invalidation(repo_name)
 
                Session().commit()
 
                h.flash(_('Cache invalidation successful'),
 
                        category='success')
 
            except Exception as e:
 
                log.error(traceback.format_exc())
 
                h.flash(_('An error occurred during cache invalidation'),
 
                        category='error')
 

	
 
            raise HTTPFound(location=url('edit_repo_caches', repo_name=c.repo_name))
 
        return render('admin/repos/repo_edit.html')
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_remote(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.active = 'remote'
 
        if request.POST:
 
            try:
 
                ScmModel().pull_changes(repo_name, request.authuser.username, request.ip_addr)
 
                h.flash(_('Pulled from remote location'), category='success')
 
            except Exception as e:
 
                log.error(traceback.format_exc())
 
                h.flash(_('An error occurred during pull from remote location'),
 
                        category='error')
 
            raise HTTPFound(location=url('edit_repo_remote', repo_name=c.repo_name))
 
        return render('admin/repos/repo_edit.html')
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_statistics(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        repo = c.repo_info.scm_instance
 

	
 
        if c.repo_info.stats:
 
            # this is on what revision we ended up so we add +1 for count
 
            last_rev = c.repo_info.stats.stat_on_revision + 1
 
        else:
 
            last_rev = 0
 
        c.stats_revision = last_rev
 

	
 
        c.repo_last_rev = repo.count() if repo.revisions else 0
 

	
 
        if last_rev == 0 or c.repo_last_rev == 0:
 
            c.stats_percentage = 0
 
        else:
 
            c.stats_percentage = '%.2f' % ((float((last_rev)) / c.repo_last_rev) * 100)
 

	
 
        c.active = 'statistics'
 
        if request.POST:
 
            try:
 
                RepoModel().delete_stats(repo_name)
 
                Session().commit()
 
            except Exception as e:
 
                log.error(traceback.format_exc())
 
                h.flash(_('An error occurred during deletion of repository stats'),
 
                        category='error')
 
            raise HTTPFound(location=url('edit_repo_statistics', repo_name=c.repo_name))
 

	
 
        return render('admin/repos/repo_edit.html')
kallithea/controllers/changelog.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.changelog
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
changelog controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
from tg import request, session
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPFound, HTTPNotFound
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.config.routing import url
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, EmptyRepositoryError, NodeDoesNotExistError, RepositoryError
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ChangelogController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(ChangelogController, self)._before(*args, **kwargs)
 
        c.affected_files_cut_off = 60
 

	
 
    @staticmethod
 
    def __get_cs(rev, repo):
 
        """
 
        Safe way to get changeset. If error occur fail with error message.
 

	
 
        :param rev: revision to fetch
 
        :param repo: repo instance
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            h.flash(_('There are no changesets yet'), category='error')
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(unicode(e), category='error')
 
            h.flash(e, category='error')
 
        raise HTTPBadRequest()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name, revision=None, f_path=None):
 
        limit = 2000
 
        default = 100
 
        if request.GET.get('size'):
 
            c.size = max(min(safe_int(request.GET.get('size')), limit), 1)
 
            session['changelog_size'] = c.size
 
            session.save()
 
        else:
 
            c.size = int(session.get('changelog_size', default))
 
        # min size must be 1
 
        c.size = max(c.size, 1)
 
        p = safe_int(request.GET.get('page'), 1)
 
        branch_name = request.GET.get('branch', None)
 
        if (branch_name and
 
            branch_name not in c.db_repo_scm_instance.branches and
 
            branch_name not in c.db_repo_scm_instance.closed_branches and
 
            not revision
 
        ):
 
            raise HTTPFound(location=url('changelog_file_home', repo_name=c.repo_name,
 
                                    revision=branch_name, f_path=f_path or ''))
 

	
 
        if revision == 'tip':
 
            revision = None
 

	
 
        c.changelog_for_path = f_path
 
        try:
 

	
 
            if f_path:
 
                log.debug('generating changelog for path %s', f_path)
 
                # get the history for the file !
 
                tip_cs = c.db_repo_scm_instance.get_changeset()
 
                try:
 
                    collection = tip_cs.get_file_history(f_path)
 
                except (NodeDoesNotExistError, ChangesetError):
 
                    # this node is not present at tip !
 
                    try:
 
                        cs = self.__get_cs(revision, repo_name)
 
                        collection = cs.get_file_history(f_path)
 
                    except RepositoryError as e:
 
                        h.flash(unicode(e), category='warning')
 
                        h.flash(e, category='warning')
 
                        raise HTTPFound(location=h.url('changelog_home', repo_name=repo_name))
 
            else:
 
                collection = c.db_repo_scm_instance.get_changesets(start=0, end=revision,
 
                                                        branch_name=branch_name, reverse=True)
 
            c.total_cs = len(collection)
 

	
 
            c.cs_pagination = Page(collection, page=p, item_count=c.total_cs, items_per_page=c.size,
 
                                   branch=branch_name)
 

	
 
            page_revisions = [x.raw_id for x in c.cs_pagination]
 
            c.cs_comments = c.db_repo.get_comments(page_revisions)
 
            c.cs_statuses = c.db_repo.statuses(page_revisions)
 
        except EmptyRepositoryError as e:
 
            h.flash(unicode(e), category='warning')
 
            h.flash(e, category='warning')
 
            raise HTTPFound(location=url('summary_home', repo_name=c.repo_name))
 
        except (RepositoryError, ChangesetDoesNotExistError, Exception) as e:
 
            log.error(traceback.format_exc())
 
            h.flash(unicode(e), category='error')
 
            h.flash(e, category='error')
 
            raise HTTPFound(location=url('changelog_home', repo_name=c.repo_name))
 

	
 
        c.branch_name = branch_name
 
        c.branch_filters = [('', _('None'))] + \
 
            [(k, k) for k in c.db_repo_scm_instance.branches.keys()]
 
        if c.db_repo_scm_instance.closed_branches:
 
            prefix = _('(closed)') + ' '
 
            c.branch_filters += [('-', '-')] + \
 
                [(k, prefix + k) for k in c.db_repo_scm_instance.closed_branches.keys()]
 
        revs = []
 
        if not f_path:
 
            revs = [x.revision for x in c.cs_pagination]
 
        c.jsdata = graph_data(c.db_repo_scm_instance, revs)
 

	
 
        c.revision = revision # requested revision ref
 
        c.first_revision = c.cs_pagination[0] # pagination is never empty here!
 
        return render('changelog/changelog.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def changelog_details(self, cs):
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            c.cs = c.db_repo_scm_instance.get_changeset(cs)
 
            return render('changelog/changelog_details.html')
 
        raise HTTPNotFound()
kallithea/controllers/files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.files
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Files controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import posixpath
 
import shutil
 
import tempfile
 
import traceback
 
from collections import OrderedDict
 

	
 
from tg import request, response
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.controllers.changeset import _context_url, _ignorews_url, anchor_url, get_ignore_ws, get_line_ctx
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.exceptions import NonRelativePathError
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import convert_line_endings, detect_mode, safe_int, safe_str, safe_unicode, str2bool
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (
 
    ChangesetDoesNotExistError, ChangesetError, EmptyRepositoryError, ImproperArchiveTypeError, NodeAlreadyExistsError, NodeDoesNotExistError, NodeError, RepositoryError, VCSError)
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FilesController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(FilesController, self)._before(*args, **kwargs)
 

	
 
    def __get_cs(self, rev, silent_empty=False):
 
        """
 
        Safe way to get changeset if error occur it redirects to tip with
 
        proper message
 

	
 
        :param rev: revision to fetch
 
        :silent_empty: return None if repository is empty
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            if silent_empty:
 
                return None
 
            url_ = url('files_add_home',
 
                       repo_name=c.repo_name,
 
                       revision=0, f_path='', anchor='edit')
 
            add_new = h.link_to(_('Click here to add new file'), url_, class_="alert-link")
 
            h.flash(_('There are no files yet.') + ' ' + add_new, category='warning')
 
            raise HTTPNotFound()
 
        except (ChangesetDoesNotExistError, LookupError):
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(unicode(e), category='error')
 
            h.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
    def __get_filenode(self, cs, path):
 
        """
 
        Returns file_node or raise HTTP error.
 

	
 
        :param cs: given changeset
 
        :param path: path to lookup
 
        """
 

	
 
        try:
 
            file_node = cs.get_node(path)
 
            if file_node.is_dir():
 
                raise RepositoryError('given path is a directory')
 
        except ChangesetDoesNotExistError:
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(unicode(e), category='error')
 
            h.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
        return file_node
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name, revision, f_path, annotate=False):
 
        # redirect to given revision from form if given
 
        post_revision = request.POST.get('at_rev', None)
 
        if post_revision:
 
            cs = self.__get_cs(post_revision) # FIXME - unused!
 

	
 
        c.revision = revision
 
        c.changeset = self.__get_cs(revision)
 
        c.branch = request.GET.get('branch', None)
 
        c.f_path = f_path
 
        c.annotate = annotate
 
        cur_rev = c.changeset.revision
 
        # used in files_source.html:
 
        c.cut_off_limit = self.cut_off_limit
 
        c.fulldiff = request.GET.get('fulldiff')
 

	
 
        # prev link
 
        try:
 
            prev_rev = c.db_repo_scm_instance.get_changeset(cur_rev).prev(c.branch)
 
            c.url_prev = url('files_home', repo_name=c.repo_name,
 
                         revision=prev_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_prev += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_prev = '#'
 

	
 
        # next link
 
        try:
 
            next_rev = c.db_repo_scm_instance.get_changeset(cur_rev).next(c.branch)
 
            c.url_next = url('files_home', repo_name=c.repo_name,
 
                     revision=next_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_next += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_next = '#'
 

	
 
        # files or dirs
 
        try:
 
            c.file = c.changeset.get_node(f_path)
 

	
 
            if c.file.is_submodule():
 
                raise HTTPFound(location=c.file.url)
 
            elif c.file.is_file():
 
                c.load_full_history = False
 
                # determine if we're on branch head
 
                _branches = c.db_repo_scm_instance.branches
 
                c.on_branch_head = revision in _branches or revision in _branches.values()
 
                _hist = []
 
                c.file_history = []
 
                if c.load_full_history:
 
                    c.file_history, _hist = self._get_node_history(c.changeset, f_path)
 

	
 
                c.authors = []
 
                for a in set([x.author for x in _hist]):
 
                    c.authors.append((h.email(a), h.person(a)))
 
            else:
 
                c.authors = c.file_history = []
 
        except RepositoryError as e:
 
            h.flash(unicode(e), category='error')
 
            h.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            return render('files/files_ypjax.html')
 

	
 
        # TODO: tags and bookmarks?
 
        c.revision_options = [(c.changeset.raw_id,
 
                              _('%s at %s') % (b, h.short_id(c.changeset.raw_id))) for b in c.changeset.branches] + \
 
            [(n, b) for b, n in c.db_repo_scm_instance.branches.items()]
 
        if c.db_repo_scm_instance.closed_branches:
 
            prefix = _('(closed)') + ' '
 
            c.revision_options += [('-', '-')] + \
 
                [(n, prefix + b) for b, n in c.db_repo_scm_instance.closed_branches.items()]
 

	
 
        return render('files/files.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def history(self, repo_name, revision, f_path):
 
        changeset = self.__get_cs(revision)
 
        _file = changeset.get_node(f_path)
 
        if _file.is_file():
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 

	
 
            res = []
 
            for obj in file_history:
 
                res.append({
 
                    'text': obj[1],
 
                    'children': [{'id': o[0], 'text': o[1]} for o in obj[0]]
 
                })
 

	
 
            data = {
 
                'more': False,
 
                'results': res
 
            }
 
            return data
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def authors(self, repo_name, revision, f_path):
 
        changeset = self.__get_cs(revision)
 
        _file = changeset.get_node(f_path)
 
        if _file.is_file():
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 
            c.authors = []
 
            for a in set([x.author for x in _hist]):
 
                c.authors.append((h.email(a), h.person(a)))
 
            return render('files/files_history_box.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def rawfile(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        response.content_disposition = 'attachment; filename=%s' % \
 
            safe_str(f_path.split(Repository.url_sep())[-1])
 

	
 
        response.content_type = file_node.mimetype
 
        return file_node.content
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def raw(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        raw_mimetype_mapping = {
 
            # map original mimetype to a mimetype used for "show as raw"
 
            # you can also provide a content-disposition to override the
 
            # default "attachment" disposition.
 
            # orig_type: (new_type, new_dispo)
 

	
 
            # show images inline:
 
            'image/x-icon': ('image/x-icon', 'inline'),
 
            'image/png': ('image/png', 'inline'),
 
            'image/gif': ('image/gif', 'inline'),
 
            'image/jpeg': ('image/jpeg', 'inline'),
 
            'image/svg+xml': ('image/svg+xml', 'inline'),
 
        }
 

	
 
        mimetype = file_node.mimetype
 
        try:
 
            mimetype, dispo = raw_mimetype_mapping[mimetype]
 
        except KeyError:
 
            # we don't know anything special about this, handle it safely
 
            if file_node.is_binary:
 
                # do same as download raw for binary files
 
                mimetype, dispo = 'application/octet-stream', 'attachment'
 
            else:
 
                # do not just use the original mimetype, but force text/plain,
 
                # otherwise it would serve text/html and that might be unsafe.
 
                # Note: underlying vcs library fakes text/plain mimetype if the
 
                # mimetype can not be determined and it thinks it is not
 
                # binary.This might lead to erroneous text display in some
 
                # cases, but helps in other cases, like with text files
 
                # without extension.
 
                mimetype, dispo = 'text/plain', 'inline'
 

	
 
        if dispo == 'attachment':
 
            dispo = 'attachment; filename=%s' % \
 
                        safe_str(f_path.split(os.sep)[-1])
 

	
 
        response.content_disposition = dispo
 
        response.content_type = mimetype
 
        return file_node.content
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def delete(self, repo_name, revision, f_path):
 
        repo = c.db_repo
 
        # check if revision is a branch identifier- basically we cannot
 
        # create multiple heads via file editing
 
        _branches = repo.scm_instance.branches
 
        # check if revision is a branch name or branch hash
 
        if revision not in _branches and revision not in _branches.values():
 
            h.flash(_('You can only delete files with revision '
 
                      'being a valid branch'), category='warning')
 
            raise HTTPFound(location=h.url('files_home',
 
                                  repo_name=repo_name, revision='tip',
 
                                  f_path=f_path))
 

	
 
        r_post = request.POST
 

	
 
        c.cs = self.__get_cs(revision)
 
        c.file = self.__get_filenode(c.cs, f_path)
 

	
 
        c.default_message = _('Deleted file %s via Kallithea') % (f_path)
 
        c.f_path = f_path
 
        node_path = f_path
 
        author = request.authuser.full_contact
 

	
 
        if r_post:
 
            message = r_post.get('message') or c.default_message
 

	
 
            try:
 
                nodes = {
 
                    node_path: {
 
                        'content': ''
 
                    }
 
                }
 
                self.scm_model.delete_nodes(
 
                    user=request.authuser.user_id,
 
                    ip_addr=request.ip_addr,
 
                    repo=c.db_repo,
 
                    message=message,
 
                    nodes=nodes,
 
                    parent_cs=c.cs,
 
                    author=author,
 
                )
 

	
 
                h.flash(_('Successfully deleted file %s') % f_path,
 
                        category='success')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return render('files/files_delete.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def edit(self, repo_name, revision, f_path):
 
        repo = c.db_repo
 
        # check if revision is a branch identifier- basically we cannot
 
        # create multiple heads via file editing
 
        _branches = repo.scm_instance.branches
 
        # check if revision is a branch name or branch hash
 
        if revision not in _branches and revision not in _branches.values():
 
            h.flash(_('You can only edit files with revision '
 
                      'being a valid branch'), category='warning')
 
            raise HTTPFound(location=h.url('files_home',
 
                                  repo_name=repo_name, revision='tip',
 
                                  f_path=f_path))
 

	
 
        r_post = request.POST
 

	
 
        c.cs = self.__get_cs(revision)
 
        c.file = self.__get_filenode(c.cs, f_path)
 

	
 
        if c.file.is_binary:
 
            raise HTTPFound(location=url('files_home', repo_name=c.repo_name,
 
                            revision=c.cs.raw_id, f_path=f_path))
 
        c.default_message = _('Edited file %s via Kallithea') % (f_path)
 
        c.f_path = f_path
 

	
 
        if r_post:
 
            old_content = safe_unicode(c.file.content)
 
            sl = old_content.splitlines(1)
 
            first_line = sl[0] if sl else ''
kallithea/controllers/pullrequests.py
Show inline comments
 
@@ -151,408 +151,408 @@ class PullrequestsController(BaseRepoCon
 

	
 
        # prio 1: rev was selected as existing entry above
 

	
 
        # prio 2: create special entry for rev; rev _must_ be used
 
        specials = []
 
        if rev and selected is None:
 
            selected = 'rev:%s:%s' % (rev, rev)
 
            specials = [(selected, '%s: %s' % (_("Changeset"), rev[:12]))]
 

	
 
        # prio 3: most recent peer branch
 
        if peers and not selected:
 
            selected = peers[0][0]
 

	
 
        # prio 4: tip revision
 
        if not selected:
 
            if h.is_hg(repo):
 
                if tipbranch:
 
                    selected = 'branch:%s:%s' % (tipbranch, tiprev)
 
                else:
 
                    selected = 'tag:null:' + repo.EMPTY_CHANGESET
 
                    tags.append((selected, 'null'))
 
            else:
 
                if 'master' in repo.branches:
 
                    selected = 'branch:master:%s' % repo.branches['master']
 
                else:
 
                    k, v = list(repo.branches.items())[0]
 
                    selected = 'branch:%s:%s' % (k, v)
 

	
 
        groups = [(specials, _("Special")),
 
                  (peers, _("Peer branches")),
 
                  (bookmarks, _("Bookmarks")),
 
                  (branches, _("Branches")),
 
                  (tags, _("Tags")),
 
                  ]
 
        return [g for g in groups if g[0]], selected
 

	
 
    def _is_allowed_to_change_status(self, pull_request):
 
        if pull_request.is_closed():
 
            return False
 

	
 
        owner = request.authuser.user_id == pull_request.owner_id
 
        reviewer = PullRequestReviewer.query() \
 
            .filter(PullRequestReviewer.pull_request == pull_request) \
 
            .filter(PullRequestReviewer.user_id == request.authuser.user_id) \
 
            .count() != 0
 

	
 
        return request.authuser.admin or owner or reviewer
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show_all(self, repo_name):
 
        c.from_ = request.GET.get('from_') or ''
 
        c.closed = request.GET.get('closed') or ''
 
        url_params = {}
 
        if c.from_:
 
            url_params['from_'] = 1
 
        if c.closed:
 
            url_params['closed'] = 1
 
        p = safe_int(request.GET.get('page'), 1)
 

	
 
        q = PullRequest.query(include_closed=c.closed, sorted=True)
 
        if c.from_:
 
            q = q.filter_by(org_repo=c.db_repo)
 
        else:
 
            q = q.filter_by(other_repo=c.db_repo)
 
        c.pull_requests = q.all()
 

	
 
        c.pullrequests_pager = Page(c.pull_requests, page=p, items_per_page=100, **url_params)
 

	
 
        return render('/pullrequests/pullrequest_show_all.html')
 

	
 
    @LoginRequired()
 
    def show_my(self):
 
        c.closed = request.GET.get('closed') or ''
 

	
 
        c.my_pull_requests = PullRequest.query(
 
            include_closed=c.closed,
 
            sorted=True,
 
        ).filter_by(owner_id=request.authuser.user_id).all()
 

	
 
        c.participate_in_pull_requests = []
 
        c.participate_in_pull_requests_todo = []
 
        done_status = set([ChangesetStatus.STATUS_APPROVED, ChangesetStatus.STATUS_REJECTED])
 
        for pr in PullRequest.query(
 
            include_closed=c.closed,
 
            reviewer_id=request.authuser.user_id,
 
            sorted=True,
 
        ):
 
            status = pr.user_review_status(request.authuser.user_id) # very inefficient!!!
 
            if status in done_status:
 
                c.participate_in_pull_requests.append(pr)
 
            else:
 
                c.participate_in_pull_requests_todo.append(pr)
 

	
 
        return render('/pullrequests/pullrequest_show_my.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self):
 
        org_repo = c.db_repo
 
        org_scm_instance = org_repo.scm_instance
 
        try:
 
            org_scm_instance.get_changeset()
 
        except EmptyRepositoryError as e:
 
            h.flash(_('There are no changesets yet'),
 
                    category='warning')
 
            raise HTTPFound(location=url('summary_home', repo_name=org_repo.repo_name))
 

	
 
        org_rev = request.GET.get('rev_end')
 
        # rev_start is not directly useful - its parent could however be used
 
        # as default for other and thus give a simple compare view
 
        rev_start = request.GET.get('rev_start')
 
        other_rev = None
 
        if rev_start:
 
            starters = org_repo.get_changeset(rev_start).parents
 
            if starters:
 
                other_rev = starters[0].raw_id
 
            else:
 
                other_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        branch = request.GET.get('branch')
 

	
 
        c.cs_repos = [(org_repo.repo_name, org_repo.repo_name)]
 
        c.default_cs_repo = org_repo.repo_name
 
        c.cs_refs, c.default_cs_ref = self._get_repo_refs(org_scm_instance, rev=org_rev, branch=branch)
 

	
 
        default_cs_ref_type, default_cs_branch, default_cs_rev = c.default_cs_ref.split(':')
 
        if default_cs_ref_type != 'branch':
 
            default_cs_branch = org_repo.get_changeset(default_cs_rev).branch
 

	
 
        # add org repo to other so we can open pull request against peer branches on itself
 
        c.a_repos = [(org_repo.repo_name, '%s (self)' % org_repo.repo_name)]
 

	
 
        if org_repo.parent:
 
            # add parent of this fork also and select it.
 
            # use the same branch on destination as on source, if available.
 
            c.a_repos.append((org_repo.parent.repo_name, '%s (parent)' % org_repo.parent.repo_name))
 
            c.a_repo = org_repo.parent
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(
 
                    org_repo.parent.scm_instance, branch=default_cs_branch, rev=other_rev)
 

	
 
        else:
 
            c.a_repo = org_repo
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(org_scm_instance, rev=other_rev)
 

	
 
        # gather forks and add to this list ... even though it is rare to
 
        # request forks to pull from their parent
 
        for fork in org_repo.forks:
 
            c.a_repos.append((fork.repo_name, fork.repo_name))
 

	
 
        return render('/pullrequests/pullrequest.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def repo_info(self, repo_name):
 
        repo = c.db_repo
 
        refs, selected_ref = self._get_repo_refs(repo.scm_instance)
 
        return {
 
            'description': repo.description.split('\n', 1)[0],
 
            'selected_ref': selected_ref,
 
            'refs': refs,
 
            }
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def create(self, repo_name):
 
        repo = c.db_repo
 
        try:
 
            _form = PullRequestForm(repo.repo_id)().to_python(request.POST)
 
        except formencode.Invalid as errors:
 
            log.error(traceback.format_exc())
 
            log.error(str(errors))
 
            msg = _('Error creating pull request: %s') % errors.msg
 
            h.flash(msg, 'error')
 
            raise HTTPBadRequest
 

	
 
        # heads up: org and other might seem backward here ...
 
        org_ref = _form['org_ref'] # will have merge_rev as rev but symbolic name
 
        org_repo = Repository.guess_instance(_form['org_repo'])
 

	
 
        other_ref = _form['other_ref'] # will have symbolic name and head revision
 
        other_repo = Repository.guess_instance(_form['other_repo'])
 

	
 
        reviewers = []
 

	
 
        title = _form['pullrequest_title']
 
        description = _form['pullrequest_desc'].strip()
 
        owner = User.get(request.authuser.user_id)
 

	
 
        try:
 
            cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers)
 
        except CreatePullRequestAction.ValidationError as e:
 
            h.flash(str(e), category='error', logf=log.error)
 
            h.flash(e, category='error', logf=log.error)
 
            raise HTTPNotFound
 

	
 
        try:
 
            pull_request = cmd.execute()
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('Error occurred while creating pull request'),
 
                    category='error')
 
            log.error(traceback.format_exc())
 
            raise HTTPFound(location=url('pullrequest_home', repo_name=repo_name))
 

	
 
        h.flash(_('Successfully opened new pull request'),
 
                category='success')
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    def create_new_iteration(self, old_pull_request, new_rev, title, description, reviewers):
 
        owner = User.get(request.authuser.user_id)
 
        new_org_rev = self._get_ref_rev(old_pull_request.org_repo, 'rev', new_rev)
 
        new_other_rev = self._get_ref_rev(old_pull_request.other_repo, old_pull_request.other_ref_parts[0], old_pull_request.other_ref_parts[1])
 
        try:
 
            cmd = CreatePullRequestIterationAction(old_pull_request, new_org_rev, new_other_rev, title, description, owner, reviewers)
 
        except CreatePullRequestAction.ValidationError as e:
 
            h.flash(str(e), category='error', logf=log.error)
 
            h.flash(e, category='error', logf=log.error)
 
            raise HTTPNotFound
 

	
 
        try:
 
            pull_request = cmd.execute()
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('Error occurred while creating pull request'),
 
                    category='error')
 
            log.error(traceback.format_exc())
 
            raise HTTPFound(location=old_pull_request.url())
 

	
 
        h.flash(_('New pull request iteration created'),
 
                category='success')
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    # pullrequest_post for PR editing
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def post(self, repo_name, pull_request_id):
 
        pull_request = PullRequest.get_or_404(pull_request_id)
 
        if pull_request.is_closed():
 
            raise HTTPForbidden()
 
        assert pull_request.other_repo.repo_name == repo_name
 
        # only owner or admin can update it
 
        owner = pull_request.owner_id == request.authuser.user_id
 
        repo_admin = h.HasRepoPermissionLevel('admin')(c.repo_name)
 
        if not (h.HasPermissionAny('hg.admin')() or repo_admin or owner):
 
            raise HTTPForbidden()
 

	
 
        _form = PullRequestPostForm()().to_python(request.POST)
 

	
 
        cur_reviewers = set(pull_request.get_reviewer_users())
 
        new_reviewers = set(_get_reviewer(s) for s in _form['review_members'])
 
        old_reviewers = set(_get_reviewer(s) for s in _form['org_review_members'])
 

	
 
        other_added = cur_reviewers - old_reviewers
 
        other_removed = old_reviewers - cur_reviewers
 

	
 
        if other_added:
 
            h.flash(_('Meanwhile, the following reviewers have been added: %s') %
 
                    (', '.join(u.username for u in other_added)),
 
                    category='warning')
 
        if other_removed:
 
            h.flash(_('Meanwhile, the following reviewers have been removed: %s') %
 
                    (', '.join(u.username for u in other_removed)),
 
                    category='warning')
 

	
 
        if _form['updaterev']:
 
            return self.create_new_iteration(pull_request,
 
                                      _form['updaterev'],
 
                                      _form['pullrequest_title'],
 
                                      _form['pullrequest_desc'],
 
                                      new_reviewers)
 

	
 
        added_reviewers = new_reviewers - old_reviewers - cur_reviewers
 
        removed_reviewers = (old_reviewers - new_reviewers) & cur_reviewers
 

	
 
        old_description = pull_request.description
 
        pull_request.title = _form['pullrequest_title']
 
        pull_request.description = _form['pullrequest_desc'].strip() or _('No description')
 
        pull_request.owner = User.get_by_username(_form['owner'])
 
        user = User.get(request.authuser.user_id)
 

	
 
        PullRequestModel().mention_from_description(user, pull_request, old_description)
 
        PullRequestModel().add_reviewers(user, pull_request, added_reviewers)
 
        PullRequestModel().remove_reviewers(user, pull_request, removed_reviewers)
 

	
 
        Session().commit()
 
        h.flash(_('Pull request updated'), category='success')
 

	
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def delete(self, repo_name, pull_request_id):
 
        pull_request = PullRequest.get_or_404(pull_request_id)
 
        # only owner can delete it !
 
        if pull_request.owner_id == request.authuser.user_id:
 
            PullRequestModel().delete(pull_request)
 
            Session().commit()
 
            h.flash(_('Successfully deleted pull request'),
 
                    category='success')
 
            raise HTTPFound(location=url('my_pullrequests'))
 
        raise HTTPForbidden()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show(self, repo_name, pull_request_id, extra=None):
 
        c.pull_request = PullRequest.get_or_404(pull_request_id)
 
        c.allowed_to_change_status = self._is_allowed_to_change_status(c.pull_request)
 
        cc_model = ChangesetCommentsModel()
 
        cs_model = ChangesetStatusModel()
 

	
 
        # pull_requests repo_name we opened it against
 
        # ie. other_repo must match
 
        if repo_name != c.pull_request.other_repo.repo_name:
 
            raise HTTPNotFound
 

	
 
        # load compare data into template context
 
        c.cs_repo = c.pull_request.org_repo
 
        (c.cs_ref_type,
 
         c.cs_ref_name,
 
         c.cs_rev) = c.pull_request.org_ref.split(':')
 

	
 
        c.a_repo = c.pull_request.other_repo
 
        (c.a_ref_type,
 
         c.a_ref_name,
 
         c.a_rev) = c.pull_request.other_ref.split(':') # a_rev is ancestor
 

	
 
        org_scm_instance = c.cs_repo.scm_instance # property with expensive cache invalidation check!!!
 
        try:
 
            c.cs_ranges = []
 
            for x in c.pull_request.revisions:
 
                c.cs_ranges.append(org_scm_instance.get_changeset(x))
 
        except ChangesetDoesNotExistError:
 
            c.cs_ranges = []
 
            h.flash(_('Revision %s not found in %s') % (x, c.cs_repo.repo_name),
 
                'error')
 
        c.cs_ranges_org = None # not stored and not important and moving target - could be calculated ...
 
        revs = [ctx.revision for ctx in reversed(c.cs_ranges)]
 
        c.jsdata = graph_data(org_scm_instance, revs)
 

	
 
        c.is_range = False
 
        try:
 
            if c.a_ref_type == 'rev': # this looks like a free range where target is ancestor
 
                cs_a = org_scm_instance.get_changeset(c.a_rev)
 
                root_parents = c.cs_ranges[0].parents
 
                c.is_range = cs_a in root_parents
 
                #c.merge_root = len(root_parents) > 1 # a range starting with a merge might deserve a warning
 
        except ChangesetDoesNotExistError: # probably because c.a_rev not found
 
            pass
 
        except IndexError: # probably because c.cs_ranges is empty, probably because revisions are missing
 
            pass
 

	
 
        avail_revs = set()
 
        avail_show = []
 
        c.cs_branch_name = c.cs_ref_name
 
        c.a_branch_name = None
 
        other_scm_instance = c.a_repo.scm_instance
 
        c.update_msg = ""
 
        c.update_msg_other = ""
 
        try:
 
            if not c.cs_ranges:
 
                c.update_msg = _('Error: changesets not found when displaying pull request from %s.') % c.cs_rev
 
            elif org_scm_instance.alias == 'hg' and c.a_ref_name != 'ancestor':
 
                if c.cs_ref_type != 'branch':
 
                    c.cs_branch_name = org_scm_instance.get_changeset(c.cs_ref_name).branch # use ref_type ?
 
                c.a_branch_name = c.a_ref_name
 
                if c.a_ref_type != 'branch':
 
                    try:
 
                        c.a_branch_name = other_scm_instance.get_changeset(c.a_ref_name).branch # use ref_type ?
 
                    except EmptyRepositoryError:
 
                        c.a_branch_name = 'null' # not a branch name ... but close enough
 
                # candidates: descendants of old head that are on the right branch
 
                #             and not are the old head itself ...
 
                #             and nothing at all if old head is a descendant of target ref name
 
                if not c.is_range and other_scm_instance._repo.revs('present(%s)::&%s', c.cs_ranges[-1].raw_id, c.a_branch_name):
 
                    c.update_msg = _('This pull request has already been merged to %s.') % c.a_branch_name
 
                elif c.pull_request.is_closed():
 
                    c.update_msg = _('This pull request has been closed and can not be updated.')
 
                else: # look for descendants of PR head on source branch in org repo
 
                    avail_revs = org_scm_instance._repo.revs('%s:: & branch(%s)',
 
                                                             revs[0], c.cs_branch_name)
 
                    if len(avail_revs) > 1: # more than just revs[0]
 
                        # also show changesets that not are descendants but would be merged in
 
                        targethead = other_scm_instance.get_changeset(c.a_branch_name).raw_id
 
                        if org_scm_instance.path != other_scm_instance.path:
 
                            # Note: org_scm_instance.path must come first so all
 
                            # valid revision numbers are 100% org_scm compatible
 
                            # - both for avail_revs and for revset results
 
                            hgrepo = unionrepo.makeunionrepository(org_scm_instance.baseui,
 
                                                                   org_scm_instance.path,
 
                                                                   other_scm_instance.path)
 
                        else:
 
                            hgrepo = org_scm_instance._repo
 
                        show = set(hgrepo.revs('::%ld & !::parents(%s) & !::%s',
 
                                               avail_revs, revs[0], targethead))
 
                        if show:
 
                            c.update_msg = _('The following additional changes are available on %s:') % c.cs_branch_name
 
                        else:
 
                            c.update_msg = _('No additional changesets found for iterating on this pull request.')
 
                    else:
 
                        show = set()
 
                        avail_revs = set() # drop revs[0]
 
                        c.update_msg = _('No additional changesets found for iterating on this pull request.')
 

	
 
                    # TODO: handle branch heads that not are tip-most
 
                    brevs = org_scm_instance._repo.revs('%s - %ld - %s', c.cs_branch_name, avail_revs, revs[0])
 
                    if brevs:
 
                        # also show changesets that are on branch but neither ancestors nor descendants
 
                        show.update(org_scm_instance._repo.revs('::%ld - ::%ld - ::%s', brevs, avail_revs, c.a_branch_name))
kallithea/controllers/summary.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.summary
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Summary controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import calendar
 
import itertools
 
import logging
 
import traceback
 
from datetime import date, timedelta
 
from time import mktime
 

	
 
from beaker.cache import cache_region
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.config.conf import ALL_EXTS, ALL_READMES, LANGUAGES_EXTENSIONS_MAP
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.celerylib.tasks import get_commits_stats
 
from kallithea.lib.compat import json
 
from kallithea.lib.markup_renderer import MarkupRenderer
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int, safe_unicode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetError, EmptyRepositoryError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.model.db import Statistics
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
README_FILES = [''.join([x[0][0], x[1][0]]) for x in
 
                    sorted(list(itertools.product(ALL_READMES, ALL_EXTS)),
 
                           key=lambda y:y[0][1] + y[1][1])]
 

	
 

	
 
class SummaryController(BaseRepoController):
 

	
 
    def __get_readme_data(self, db_repo):
 
        repo_name = db_repo.repo_name
 
        log.debug('Looking for README file')
 

	
 
        @cache_region('long_term', '_get_readme_from_cache')
 
        def _get_readme_from_cache(*_cache_keys):  # parameters are not really used - only as caching key
 
            readme_data = None
 
            readme_file = None
 
            try:
 
                # gets the landing revision! or tip if fails
 
                cs = db_repo.get_landing_changeset()
 
                if isinstance(cs, EmptyChangeset):
 
                    raise EmptyRepositoryError()
 
                renderer = MarkupRenderer()
 
                for f in README_FILES:
 
                    try:
 
                        readme = cs.get_node(f)
 
                        if not isinstance(readme, FileNode):
 
                            continue
 
                        readme_file = f
 
                        log.debug('Found README file `%s` rendering...',
 
                                  readme_file)
 
                        readme_data = renderer.render(safe_unicode(readme.content),
 
                                                      filename=f)
 
                        break
 
                    except NodeDoesNotExistError:
 
                        continue
 
            except ChangesetError:
 
                log.error(traceback.format_exc())
 
                pass
 
            except EmptyRepositoryError:
 
                pass
 

	
 
            return readme_data, readme_file
 

	
 
        kind = 'README'
 
        return _get_readme_from_cache(repo_name, kind, c.db_repo.changeset_cache.get('raw_id'))
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name):
 
        p = safe_int(request.GET.get('page'), 1)
 
        size = safe_int(request.GET.get('size'), 10)
 
        try:
 
            collection = c.db_repo_scm_instance.get_changesets(reverse=True)
 
        except EmptyRepositoryError as e:
 
            h.flash(unicode(e), category='warning')
 
            h.flash(e, category='warning')
 
            collection = []
 
        c.cs_pagination = Page(collection, page=p, items_per_page=size)
 
        page_revisions = [x.raw_id for x in list(c.cs_pagination)]
 
        c.cs_comments = c.db_repo.get_comments(page_revisions)
 
        c.cs_statuses = c.db_repo.statuses(page_revisions)
 

	
 
        c.ssh_repo_url = None
 
        if request.authuser.is_default_user:
 
            username = None
 
        else:
 
            username = request.authuser.username
 
            if c.ssh_enabled:
 
                c.ssh_repo_url = c.db_repo.clone_url(clone_uri_tmpl=c.clone_ssh_tmpl)
 

	
 
        c.clone_repo_url = c.db_repo.clone_url(clone_uri_tmpl=c.clone_uri_tmpl, with_id=False, username=username)
 
        c.clone_repo_url_id = c.db_repo.clone_url(clone_uri_tmpl=c.clone_uri_tmpl, with_id=True, username=username)
 

	
 
        if c.db_repo.enable_statistics:
 
            c.show_stats = True
 
        else:
 
            c.show_stats = False
 

	
 
        stats = Statistics.query() \
 
            .filter(Statistics.repository == c.db_repo) \
 
            .scalar()
 

	
 
        c.stats_percentage = 0
 

	
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 

	
 
            lang_stats = [(x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x, '?')})
 
                          for x, y in lang_stats_d.items()]
 
            lang_stats.sort(key=lambda k: (-k[1]['count'], k[0]))
 

	
 
            c.trending_languages = lang_stats[:10]
 
        else:
 
            c.no_data = True
 
            c.trending_languages = []
 

	
 
        c.enable_downloads = c.db_repo.enable_downloads
 
        c.readme_data, c.readme_file = \
 
            self.__get_readme_data(c.db_repo)
 
        return render('summary/summary.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def repo_size(self, repo_name):
 
        if request.is_xhr:
 
            return c.db_repo._repo_size()
 
        else:
 
            raise HTTPBadRequest()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def statistics(self, repo_name):
 
        if c.db_repo.enable_statistics:
 
            c.show_stats = True
 
            c.no_data_msg = _('No data ready yet')
 
        else:
 
            c.show_stats = False
 
            c.no_data_msg = _('Statistics are disabled for this repository')
 

	
 
        td = date.today() + timedelta(days=1)
 
        td_1m = td - timedelta(days=calendar.mdays[td.month])
 
        td_1y = td - timedelta(days=365)
 

	
 
        ts_min_m = mktime(td_1m.timetuple())
 
        ts_min_y = mktime(td_1y.timetuple())
 
        ts_max_y = mktime(td.timetuple())
 
        c.ts_min = ts_min_m
 
        c.ts_max = ts_max_y
 

	
 
        stats = Statistics.query() \
 
            .filter(Statistics.repository == c.db_repo) \
 
            .scalar()
 
        c.stats_percentage = 0
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 
            c.commit_data = json.loads(stats.commit_activity)
 
            c.overview_data = json.loads(stats.commit_activity_combined)
 

	
 
            lang_stats = ((x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x)})
 
                          for x, y in lang_stats_d.items())
 

	
 
            c.trending_languages = (
 
                sorted(lang_stats, reverse=True, key=lambda k: k[1])[:10]
 
            )
 
            last_rev = stats.stat_on_revision + 1
 
            c.repo_last_rev = c.db_repo_scm_instance.count() \
 
                if c.db_repo_scm_instance.revisions else 0
 
            if last_rev == 0 or c.repo_last_rev == 0:
 
                pass
 
            else:
 
                c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                                c.repo_last_rev) * 100)
 
        else:
 
            c.commit_data = {}
 
            c.overview_data = ([[ts_min_y, 0], [ts_max_y, 10]])
 
            c.trending_languages = {}
 
            c.no_data = True
 

	
 
        recurse_limit = 500  # don't recurse more than 500 times when parsing
 
        get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit)
 
        return render('summary/statistics.html')
kallithea/lib/base.py
Show inline comments
 
@@ -415,231 +415,231 @@ class BaseController(TGController):
 

	
 
        c.my_pr_count = PullRequest.query(reviewer_id=request.authuser.user_id, include_closed=False).count()
 

	
 
        self.scm_model = ScmModel()
 

	
 
    @staticmethod
 
    def _determine_auth_user(session_authuser, ip_addr):
 
        """
 
        Create an `AuthUser` object given the API key/bearer token
 
        (if any) and the value of the authuser session cookie.
 
        Returns None if no valid user is found (like not active or no access for IP).
 
        """
 

	
 
        # Authenticate by session cookie
 
        # In ancient login sessions, 'authuser' may not be a dict.
 
        # In that case, the user will have to log in again.
 
        # v0.3 and earlier included an 'is_authenticated' key; if present,
 
        # this must be True.
 
        if isinstance(session_authuser, dict) and session_authuser.get('is_authenticated', True):
 
            return AuthUser.from_cookie(session_authuser, ip_addr=ip_addr)
 

	
 
        # Authenticate by auth_container plugin (if enabled)
 
        if any(
 
            plugin.is_container_auth
 
            for plugin in auth_modules.get_auth_plugins()
 
        ):
 
            try:
 
                user_info = auth_modules.authenticate('', '', request.environ)
 
            except UserCreationError as e:
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 
            else:
 
                if user_info is not None:
 
                    username = user_info['username']
 
                    user = User.get_by_username(username, case_insensitive=True)
 
                    return log_in_user(user, remember=False, is_external_auth=True, ip_addr=ip_addr)
 

	
 
        # User is default user (if active) or anonymous
 
        default_user = User.get_default_user(cache=True)
 
        authuser = AuthUser.make(dbuser=default_user, ip_addr=ip_addr)
 
        if authuser is None: # fall back to anonymous
 
            authuser = AuthUser(dbuser=default_user) # TODO: somehow use .make?
 
        return authuser
 

	
 
    @staticmethod
 
    def _basic_security_checks():
 
        """Perform basic security/sanity checks before processing the request."""
 

	
 
        # Only allow the following HTTP request methods.
 
        if request.method not in ['GET', 'HEAD', 'POST']:
 
            raise webob.exc.HTTPMethodNotAllowed()
 

	
 
        # Also verify the _method override - no longer allowed.
 
        if request.params.get('_method') is None:
 
            pass # no override, no problem
 
        else:
 
            raise webob.exc.HTTPMethodNotAllowed()
 

	
 
        # Make sure CSRF token never appears in the URL. If so, invalidate it.
 
        from kallithea.lib import helpers as h
 
        if h.session_csrf_secret_name in request.GET:
 
            log.error('CSRF key leak detected')
 
            session.pop(h.session_csrf_secret_name, None)
 
            session.save()
 
            h.flash(_('CSRF token leak has been detected - all form tokens have been expired'),
 
                    category='error')
 

	
 
        # WebOb already ignores request payload parameters for anything other
 
        # than POST/PUT, but double-check since other Kallithea code relies on
 
        # this assumption.
 
        if request.method not in ['POST', 'PUT'] and request.POST:
 
            log.error('%r request with payload parameters; WebOb should have stopped this', request.method)
 
            raise webob.exc.HTTPBadRequest()
 

	
 
    def __call__(self, environ, context):
 
        try:
 
            ip_addr = _get_ip_addr(environ)
 
            self._basic_security_checks()
 

	
 
            api_key = request.GET.get('api_key')
 
            try:
 
                # Request.authorization may raise ValueError on invalid input
 
                type, params = request.authorization
 
            except (ValueError, TypeError):
 
                pass
 
            else:
 
                if type.lower() == 'bearer':
 
                    api_key = params # bearer token is an api key too
 

	
 
            if api_key is None:
 
                authuser = self._determine_auth_user(
 
                    session.get('authuser'),
 
                    ip_addr=ip_addr,
 
                )
 
                needs_csrf_check = request.method not in ['GET', 'HEAD']
 

	
 
            else:
 
                dbuser = User.get_by_api_key(api_key)
 
                if dbuser is None:
 
                    log.info('No db user found for authentication with API key ****%s from %s',
 
                             api_key[-4:], ip_addr)
 
                authuser = AuthUser.make(dbuser=dbuser, is_external_auth=True, ip_addr=ip_addr)
 
                needs_csrf_check = False # API key provides CSRF protection
 

	
 
            if authuser is None:
 
                log.info('No valid user found')
 
                raise webob.exc.HTTPForbidden()
 

	
 
            # set globals for auth user
 
            request.authuser = authuser
 
            request.ip_addr = ip_addr
 
            request.needs_csrf_check = needs_csrf_check
 

	
 
            log.info('IP: %s User: %s accessed %s',
 
                request.ip_addr, request.authuser,
 
                get_path_info(environ),
 
            )
 
            return super(BaseController, self).__call__(environ, context)
 
        except webob.exc.HTTPException as e:
 
            return e
 

	
 

	
 
class BaseRepoController(BaseController):
 
    """
 
    Base class for controllers responsible for loading all needed data for
 
    repository loaded items are
 

	
 
    c.db_repo_scm_instance: instance of scm repository
 
    c.db_repo: instance of db
 
    c.repository_followers: number of followers
 
    c.repository_forks: number of forks
 
    c.repository_following: weather the current user is following the current repo
 
    """
 

	
 
    def _before(self, *args, **kwargs):
 
        super(BaseRepoController, self)._before(*args, **kwargs)
 
        if c.repo_name:  # extracted from routes
 
            _dbr = Repository.get_by_repo_name(c.repo_name)
 
            if not _dbr:
 
                return
 

	
 
            log.debug('Found repository in database %s with state `%s`',
 
                      safe_unicode(_dbr), safe_unicode(_dbr.repo_state))
 
            route = getattr(request.environ.get('routes.route'), 'name', '')
 

	
 
            # allow to delete repos that are somehow damages in filesystem
 
            if route in ['delete_repo']:
 
                return
 

	
 
            if _dbr.repo_state in [Repository.STATE_PENDING]:
 
                if route in ['repo_creating_home']:
 
                    return
 
                check_url = url('repo_creating_home', repo_name=c.repo_name)
 
                raise webob.exc.HTTPFound(location=check_url)
 

	
 
            dbr = c.db_repo = _dbr
 
            c.db_repo_scm_instance = c.db_repo.scm_instance
 
            if c.db_repo_scm_instance is None:
 
                log.error('%s this repository is present in database but it '
 
                          'cannot be created as an scm instance', c.repo_name)
 
                from kallithea.lib import helpers as h
 
                h.flash(_('Repository not found in the filesystem'),
 
                        category='error')
 
                raise webob.exc.HTTPNotFound()
 

	
 
            # some globals counter for menu
 
            c.repository_followers = self.scm_model.get_followers(dbr)
 
            c.repository_forks = self.scm_model.get_forks(dbr)
 
            c.repository_pull_requests = self.scm_model.get_pull_requests(dbr)
 
            c.repository_following = self.scm_model.is_following_repo(
 
                                    c.repo_name, request.authuser.user_id)
 

	
 
    @staticmethod
 
    def _get_ref_rev(repo, ref_type, ref_name, returnempty=False):
 
        """
 
        Safe way to get changeset. If error occurs show error.
 
        """
 
        from kallithea.lib import helpers as h
 
        try:
 
            return repo.scm_instance.get_ref_revision(ref_type, ref_name)
 
        except EmptyRepositoryError as e:
 
            if returnempty:
 
                return repo.scm_instance.EMPTY_CHANGESET
 
            h.flash(_('There are no changesets yet'), category='error')
 
            raise webob.exc.HTTPNotFound()
 
        except ChangesetDoesNotExistError as e:
 
            h.flash(_('Changeset for %s %s not found in %s') %
 
                              (ref_type, ref_name, repo.repo_name),
 
                    category='error')
 
            raise webob.exc.HTTPNotFound()
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(unicode(e), category='error')
 
            h.flash(e, category='error')
 
            raise webob.exc.HTTPBadRequest()
 

	
 

	
 
@decorator.decorator
 
def jsonify(func, *args, **kwargs):
 
    """Action decorator that formats output for JSON
 

	
 
    Given a function that will return content, this decorator will turn
 
    the result into JSON, with a content-type of 'application/json' and
 
    output it.
 
    """
 
    response.headers['Content-Type'] = 'application/json; charset=utf-8'
 
    data = func(*args, **kwargs)
 
    if isinstance(data, (list, tuple)):
 
        # A JSON list response is syntactically valid JavaScript and can be
 
        # loaded and executed as JavaScript by a malicious third-party site
 
        # using <script>, which can lead to cross-site data leaks.
 
        # JSON responses should therefore be scalars or objects (i.e. Python
 
        # dicts), because a JSON object is a syntax error if intepreted as JS.
 
        msg = "JSON responses with Array envelopes are susceptible to " \
 
              "cross-site data leak attacks, see " \
 
              "https://web.archive.org/web/20120519231904/http://wiki.pylonshq.com/display/pylonsfaq/Warnings"
 
        warnings.warn(msg, Warning, 2)
 
        log.warning(msg)
 
    log.debug("Returning JSON wrapped action output")
 
    return json.dumps(data)
 

	
 
@decorator.decorator
 
def IfSshEnabled(func, *args, **kwargs):
 
    """Decorator for functions that can only be called if SSH access is enabled.
 

	
 
    If SSH access is disabled in the configuration file, HTTPNotFound is raised.
 
    """
 
    if not c.ssh_enabled:
 
        from kallithea.lib import helpers as h
 
        h.flash(_("SSH access is disabled."), category='warning')
 
        raise webob.exc.HTTPNotFound()
 
    return func(*args, **kwargs)
kallithea/lib/helpers.py
Show inline comments
 
@@ -234,447 +234,447 @@ class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<span id="L%s">%s</span>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        inner_lines = []
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            inner_lines.append(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        yield 0, ''.join(inner_lines)
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
_whitespace_re = re.compile(r'(\t)|( )(?=\n|</div>)')
 

	
 

	
 
def _markup_whitespace(m):
 
    groups = m.groups()
 
    if groups[0]:
 
        return '<u>\t</u>'
 
    if groups[1]:
 
        return ' <i></i>'
 

	
 

	
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(safe_unicode(filenode.content), lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 

	
 
    color_dict = {}
 

	
 
    def gen_color(n=10000):
 
        """generator for getting n of evenly distributed colors using
 
        hsv color and golden ratio. It always return same order of colors
 

	
 
        :returns: RGB tuple
 
        """
 

	
 
        def hsv_to_rgb(h, s, v):
 
            if s == 0.0:
 
                return v, v, v
 
            i = int(h * 6.0)  # XXX assume int() truncates!
 
            f = (h * 6.0) - i
 
            p = v * (1.0 - s)
 
            q = v * (1.0 - s * f)
 
            t = v * (1.0 - s * (1.0 - f))
 
            i = i % 6
 
            if i == 0:
 
                return v, t, p
 
            if i == 1:
 
                return q, v, p
 
            if i == 2:
 
                return p, v, t
 
            if i == 3:
 
                return p, q, v
 
            if i == 4:
 
                return t, p, v
 
            if i == 5:
 
                return v, p, q
 

	
 
        golden_ratio = 0.618033988749895
 
        h = 0.22717784590367374
 

	
 
        for _unused in xrange(n):
 
            h += golden_ratio
 
            h %= 1
 
            HSV_tuple = [h, 0.95, 0.95]
 
            RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
            yield [str(int(x * 256)) for x in RGB_tuple]
 

	
 
    cgenerator = gen_color()
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = cgenerator.next()
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(repo_name):
 

	
 
        def _url_func(changeset):
 
            author = escape(changeset.author)
 
            date = changeset.date
 
            message = escape(changeset.message)
 
            tooltip_html = ("<b>Author:</b> %s<br/>"
 
                            "<b>Date:</b> %s</b><br/>"
 
                            "<b>Message:</b> %s") % (author, date, message)
 

	
 
            lnk_format = show_id(changeset)
 
            uri = link_to(
 
                    lnk_format,
 
                    url('changeset_home', repo_name=repo_name,
 
                        revision=changeset.raw_id),
 
                    style=get_color_string(changeset.raw_id),
 
                    **{'data-toggle': 'popover',
 
                       'data-content': tooltip_html}
 
                  )
 

	
 
            uri += '\n'
 
            return uri
 
        return _url_func
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func(repo_name), **kwargs)))
 

	
 

	
 
class _Message(object):
 
    """A message returned by ``pop_flash_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``message``: the message text.
 
    * ``category``: the category specified when the message was created.
 
    * ``message``: the html-safe message text.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 
    def __str__(self):
 
        return self.message
 

	
 
    __unicode__ = __str__
 

	
 
    def __html__(self):
 
        return escape(safe_unicode(self.message))
 

	
 

	
 
def _session_flash_messages(append=None, clear=False):
 
    """Manage a message queue in tg.session: return the current message queue
 
    after appending the given message, and possibly clearing the queue."""
 
    key = 'flash'
 
    from tg import session
 
    if key in session:
 
        flash_messages = session[key]
 
    else:
 
        if append is None:  # common fast path - also used for clearing empty queue
 
            return []  # don't bother saving
 
        flash_messages = []
 
        session[key] = flash_messages
 
    if append is not None and append not in flash_messages:
 
        flash_messages.append(append)
 
    if clear:
 
        session.pop(key, None)
 
    session.save()
 
    return flash_messages
 

	
 

	
 
def flash(message, category=None, logf=None):
 
def flash(message, category, logf=None):
 
    """
 
    Show a message to the user _and_ log it through the specified function
 

	
 
    category: notice (default), warning, error, success
 
    logf: a custom log function - such as log.debug
 

	
 
    logf defaults to log.info, unless category equals 'success', in which
 
    case logf defaults to log.debug.
 
    """
 
    assert category in ('error', 'success', 'warning'), category
 
    if hasattr(message, '__html__'):
 
        # render to HTML for storing in cookie
 
        safe_message = unicode(message)
 
    else:
 
        # Apply str - the message might be an exception with __str__
 
        # Escape, so we can trust the result without further escaping, without any risk of injection
 
        safe_message = html_escape(unicode(message))
 
    if logf is None:
 
        logf = log.info
 
        if category == 'success':
 
            logf = log.debug
 

	
 
    logf('Flash %s: %s', category, message)
 
    logf('Flash %s: %s', category, safe_message)
 

	
 
    _session_flash_messages(append=(category, message))
 
    _session_flash_messages(append=(category, safe_message))
 

	
 

	
 
def pop_flash_messages():
 
    """Return all accumulated messages and delete them from the session.
 

	
 
    The return value is a list of ``Message`` objects.
 
    """
 
    return [_Message(*m) for m in _session_flash_messages(clear=True)]
 
    return [_Message(category, message) for category, message in _session_flash_messages(clear=True)]
 

	
 

	
 
age = lambda x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S")
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
@cache_region('long_term', 'user_attr_or_none')
 
def user_attr_or_none(author, show_attr):
 
    """Try to match email part of VCS committer string with a local user and return show_attr
 
    - or return None if user not found"""
 
    email = author_email(author)
 
    if email:
 
        from kallithea.model.db import User
 
        user = User.get_by_email(email, cache=True) # cache will only use sql_cache_short
 
        if user is not None:
 
            return getattr(user, show_attr)
 
    return None
 

	
 

	
 
def email_or_none(author):
 
    """Try to match email part of VCS committer string with a local user.
 
    Return primary email of user, email part of the specified author name, or None."""
 
    if not author:
 
        return None
 
    email = user_attr_or_none(author, 'email')
 
    if email is not None:
 
        return email # always use user's main email address - not necessarily the one used to find user
 

	
 
    # extract email from the commit string
 
    email = author_email(author)
 
    if email:
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    from kallithea.model.db import User
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return getattr(author, show_attr)
 

	
 
    value = user_attr_or_none(author, show_attr)
 
    if value is not None:
 
        return value
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    from kallithea.model.db import User
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            lazy_cs = False
 
            title_ = None
 
            url_ = '#'
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                if rev.op and rev.ref_name:
 
                    if rev.op == 'delete_branch':
 
                        lbl = _('Deleted branch: %s') % rev.ref_name
 
                    elif rev.op == 'tag':
 
                        lbl = _('Created tag: %s') % rev.ref_name
 
                    else:
 
                        lbl = 'Unknown operation %s' % rev.op
 
                else:
 
                    lazy_cs = True
 
                    lbl = rev.short_id[:8]
 
                    url_ = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
            else:
 
                # changeset cannot be found - it might have been stripped or removed
 
                lbl = rev[:12]
 
                title_ = _('Changeset %s not found') % lbl
 
            if parse_cs:
 
                return link_to(lbl, url_, title=title_, **{'data-toggle': 'tooltip'})
 
            return link_to(lbl, url_, class_='lazy-cs' if lazy_cs else '',
 
                           **{'data-raw_id': rev.raw_id, 'data-repo_name': repo_name})
 

	
 
        def _get_op(rev_txt):
 
            _op = None
 
            _name = rev_txt
 
            if len(rev_txt.split('=>')) == 2:
 
                _op, _name = rev_txt.split('=>')
 
            return _op, _name
 

	
 
        revs = []
 
        if len([v for v in revs_ids if v != '']) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
kallithea/templates/base/flash_msg.html
Show inline comments
 
<div class="flash_msg">
 
    <% messages = h.pop_flash_messages() %>
 
    % if messages:
 
        <% alert_categories = {'warning': 'alert-warning', 'notice': 'alert-info', 'error': 'alert-danger', 'success': 'alert-success'} %>
 
        % for message in messages:
 
            <div class="alert alert-dismissable ${alert_categories[message.category]}" role="alert">
 
              <button type="button" class="close" data-dismiss="alert" aria-hidden="true"><i class="icon-cancel-circled"></i></button>
 
              ${message}
 
              ${message.message|n}
 
            </div>
 
        % endfor
 
    % endif
 
    <script>
 
    if (typeof jQuery != 'undefined') {
 
        $(".alert").alert();
 
    }
 
    </script>
 
</div>
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -14,436 +14,436 @@
 

	
 
import pytest
 
from sqlalchemy.orm.exc import NoResultFound
 
from tg.util.webtest import test_context
 
from webob.exc import HTTPNotFound
 

	
 
from kallithea.controllers.admin.users import UsersController
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import check_password
 
from kallithea.model import validators
 
from kallithea.model.db import Permission, RepoGroup, User, UserApiKeys, UserSshKeys
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
@pytest.fixture
 
def user_and_repo_group_fail():
 
    username = 'repogrouperr'
 
    groupname = u'repogroup_fail'
 
    user = fixture.create_user(name=username)
 
    repo_group = fixture.create_repo_group(name=groupname, cur_user=username)
 
    yield user, repo_group
 
    # cleanup
 
    if RepoGroup.get_by_group_name(groupname):
 
        fixture.destroy_repo_group(repo_group)
 

	
 

	
 
class TestAdminUsersController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users'))
 
        # TODO: Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        username = 'newtestuser'
 
        password = 'test12'
 
        password_confirmation = password
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'mail@example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'password_confirmation': password_confirmation,
 
             'firstname': name,
 
             'active': True,
 
             'lastname': lastname,
 
             'extern_name': 'internal',
 
             'extern_type': 'internal',
 
             'email': email,
 
             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        # 302 Found
 
        # The resource was found at http://localhost/_admin/users/5/edit; you should be redirected automatically.
 

	
 
        self.checkSessionFlash(response, '''Created user %s''' % username)
 

	
 
        response = response.follow()
 
        response.mustcontain("""%s user settings""" % username) # in <title>
 

	
 
        new_user = Session().query(User). \
 
            filter(User.username == username).one()
 

	
 
        assert new_user.username == username
 
        assert check_password(password, new_user.password) == True
 
        assert new_user.name == name
 
        assert new_user.lastname == lastname
 
        assert new_user.email == email
 

	
 
    def test_create_err(self):
 
        self.log_user()
 
        username = 'new_user'
 
        password = ''
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'errmail.example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'name': name,
 
             'active': False,
 
             'lastname': lastname,
 
             'email': email,
 
             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
 
        msg = h.html_escape(msg % {'username': 'new_user'})
 
        response.mustcontain("""<span class="error-message">%s</span>""" % msg)
 
        response.mustcontain("""<span class="error-message">Please enter a value</span>""")
 
        response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
 

	
 
        def get_user():
 
            Session().query(User).filter(User.username == username).one()
 

	
 
        with pytest.raises(NoResultFound):
 
            get_user(), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 

	
 
    @parametrize('name,attrs',
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         ('extern_name', {'extern_name': 'test'}),
 
         ('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'someemail@example.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_update(self, name, attrs):
 
        self.log_user()
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        Session().commit()
 
        params = usr.get_api_data(True)
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update(attrs)
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            # cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            # cannot update this via form, expected value is original one
 
            params['extern_name'] = self.test_user_1
 
            # special case since this user is not logged in yet his data is
 
            # not filled so we use creation data
 

	
 
        params.update({'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response = self.app.post(url('update_user', id=usr.user_id), params)
 
        self.checkSessionFlash(response, 'User updated successfully')
 
        params.pop('_session_csrf_secret_token')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        assert params == updated_params
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        username = 'newtestuserdeleteme'
 

	
 
        fixture.create_user(name=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_err(self):
 
        self.log_user()
 
        username = 'repoerr'
 
        reponame = u'repoerr_fail'
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo(name=reponame, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
        self.checkSessionFlash(response, 'User &quot;%s&quot; still '
 
                               'owns 1 repositories and cannot be removed. '
 
                               'Switch owners or remove those repositories: '
 
                               '%s' % (username, reponame))
 

	
 
        response = self.app.post(url('delete_repo', repo_name=reponame),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_group_err(self, user_and_repo_group_fail):
 
        new_user, repo_group = user_and_repo_group_fail
 
        username = new_user.username
 
        groupname = repo_group.group_name
 

	
 
        self.log_user()
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
        self.checkSessionFlash(response, 'User &quot;%s&quot; still '
 
                               'owns 1 repository groups and cannot be removed. '
 
                               'Switch owners or remove those repository groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
 
        # rg = RepoGroup.get_by_group_name(group_name=groupname)
 
        # response = self.app.get(url('repos_groups', id=rg.group_id))
 

	
 
        response = self.app.post(url('delete_repo_group', group_name=groupname),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_user_group_err(self):
 
        self.log_user()
 
        username = 'usergrouperr'
 
        groupname = u'usergroup_fail'
 

	
 
        fixture.create_user(name=username)
 
        ug = fixture.create_user_group(name=groupname, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
        self.checkSessionFlash(response, 'User &quot;%s&quot; still '
 
                               'owns 1 user groups and cannot be removed. '
 
                               'Switch owners or remove those user groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # TODO: why do this fail?
 
        #response = self.app.delete(url('delete_users_group', id=groupname))
 
        #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
 

	
 
        fixture.destroy_user_group(ug.users_group_id)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_edit(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        response = self.app.get(url('edit_user', id=user.user_id))
 

	
 
    def test_add_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(create_repo_perm=True,
 
                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == False
 
            assert UserModel().has_perm(uid, perm_create) == True
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_add_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(create_repo_perm=True,
 
                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == False
 
            assert UserModel().has_perm(uid, perm_create) == True
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_ips(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_ips', id=user.user_id))
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    @parametrize('test_name,ip,ip_range,failure', [
 
        ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
 
        ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
 
        ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
 
        ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
 
        ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
 
        ('127_bad_ip', 'foobar', 'foobar', True),
 
    ])
 
    def test_add_ip(self, test_name, ip, ip_range, failure, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ips_update', id=user_id),
 
                                 params=dict(new_ip=ip, _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        if failure:
 
            self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(no=[ip])
 
            response.mustcontain(no=[ip_range])
 

	
 
        else:
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(ip)
 
            response.mustcontain(ip_range)
 

	
 
    def test_delete_ip(self, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        ip = '127.0.0.1/32'
 
        ip_range = '127.0.0.1 - 127.0.0.1'
 
        with test_context(self.app):
 
            new_ip = UserModel().add_extra_ip(user_id, ip)
 
            Session().commit()
 
        new_ip_id = new_ip.ip_id
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain(ip)
 
        response.mustcontain(ip_range)
 

	
 
        self.app.post(url('edit_user_ips_delete', id=user_id),
 
                      params=dict(del_ip_id=new_ip_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain('All IP addresses are allowed')
 
        response.mustcontain(no=[ip])
 
        response.mustcontain(no=[ip_range])
kallithea/tests/functional/test_files.py
Show inline comments
 
@@ -83,421 +83,421 @@ class TestFilesController(TestController
 
        for r in [(73, 'a066b25d5df7016b45a41b7e2a78c33b57adc235'),
 
                  (92, 'cc66b61b8455b264a7a8a2d8ddc80fcfc58c221e'),
 
                  (109, '75feb4c33e81186c87eac740cee2447330288412'),
 
                  (1, '3d8f361e72ab303da48d799ff1ac40d5ac37c67e'),
 
                  (0, 'b986218ba1c9b0d6a259fac9b050b1724ed8e545')]:
 

	
 
            response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision=r[1],
 
                                    f_path='/'))
 

	
 
            response.mustcontain("""@ r%s:%s""" % (r[0], r[1][:12]))
 

	
 
    def test_file_source(self):
 
        # Force the global cache to be populated now when we know the right .ini has been loaded.
 
        # (Without this, the test would fail.)
 
        import kallithea.lib.helpers
 
        kallithea.lib.helpers._urlify_issues_f = None
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='8911406ad776fdd3d0b9932a2e89677e57405a48',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        response.mustcontain("""<div class="formatted-fixed">Partially implemented <a class="issue-tracker-link" href="https://issues.example.com/vcs_test_hg/issue/16">#16</a>. filecontent/commit message/author/node name are safe_unicode now.<br/>"""
 
"""In addition some other __str__ are unicode as well<br/>"""
 
"""Added test for unicode<br/>"""
 
"""Improved test to clone into uniq repository.<br/>"""
 
"""removed extra unicode conversion in diff.</div>
 
""")
 

	
 
        response.mustcontain("""<option selected="selected" value="8911406ad776fdd3d0b9932a2e89677e57405a48">default at 8911406ad776</option>""")
 

	
 
    def test_file_source_history(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py'),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 
        assert json.loads(response.body) == json.loads(HG_NODE_HISTORY)
 

	
 
    def test_file_source_history_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py'),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 
        assert json.loads(response.body) == json.loads(GIT_NODE_HISTORY)
 

	
 
    def test_file_annotation(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate='1'))
 

	
 
        response.mustcontain("""r356:25213a5fbb04""")
 

	
 
    def test_file_annotation_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate='1'))
 
        response.mustcontain("""r345:c994f0de03b2""")
 

	
 
    def test_file_annotation_history(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate='1'),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        assert json.loads(response.body) == json.loads(HG_NODE_HISTORY)
 

	
 
    def test_file_annotation_history_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        assert json.loads(response.body) == json.loads(GIT_NODE_HISTORY)
 

	
 
    def test_file_authors(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='authors',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate='1'))
 
        response.mustcontain('Marcin Kuzminski')
 
        response.mustcontain('Lukasz Balcerzak')
 

	
 
    def test_file_authors_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='authors',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate='1'))
 
        response.mustcontain('Marcin Kuzminski')
 
        response.mustcontain('Lukasz Balcerzak')
 

	
 
    def test_archival(self):
 
        self.log_user()
 
        _set_downloads(HG_REPO, set_to=True)
 
        for arch_ext, info in ARCHIVE_SPECS.items():
 
            short = '27cd5cce30c9%s' % arch_ext
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 
            filename = '%s-%s' % (HG_REPO, short)
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 

	
 
            assert response.status == '200 OK'
 
            heads = [
 
                ('Pragma', 'no-cache'),
 
                ('Cache-Control', 'no-cache'),
 
                ('Content-Disposition', 'attachment; filename=%s' % filename),
 
                ('Content-Type', info[0]),
 
            ]
 
            assert sorted(response.response._headers.items()) == sorted(heads)
 

	
 
    def test_archival_wrong_ext(self):
 
        self.log_user()
 
        _set_downloads(HG_REPO, set_to=True)
 
        for arch_ext in ['tar', 'rar', 'x', '..ax', '.zipz']:
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 

	
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 
            response.mustcontain('Unknown archive type')
 

	
 
    def test_archival_wrong_revision(self):
 
        self.log_user()
 
        _set_downloads(HG_REPO, set_to=True)
 
        for rev in ['00x000000', 'tar', 'wrong', '@##$@$42413232', '232dffcd']:
 
            fname = '%s.zip' % rev
 

	
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 
            response.mustcontain('Unknown revision')
 

	
 
    #==========================================================================
 
    # RAW FILE
 
    #==========================================================================
 
    def test_raw_file_ok(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        assert response.content_disposition == "attachment; filename=nodes.py"
 
        assert response.content_type == mimetypes.guess_type("nodes.py")[0]
 

	
 
    def test_raw_file_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 

	
 
        msg = """Such revision does not exist for this repository"""
 
        response.mustcontain(msg)
 

	
 
    def test_raw_file_wrong_f_path(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/ERRORnodes.py'
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 

	
 
        msg = "There is no file nor directory at the given path: &#39;%s&#39; at revision %s" % (f_path, rev[:12])
 
        msg = "There is no file nor directory at the given path: &apos;%s&apos; at revision %s" % (f_path, rev[:12])
 
        response.mustcontain(msg)
 

	
 
    #==========================================================================
 
    # RAW RESPONSE - PLAIN
 
    #==========================================================================
 
    def test_raw_ok(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        assert response.content_type == "text/plain"
 

	
 
    def test_raw_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 

	
 
        msg = """Such revision does not exist for this repository"""
 
        response.mustcontain(msg)
 

	
 
    def test_raw_wrong_f_path(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/ERRORnodes.py'
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 
        msg = "There is no file nor directory at the given path: &#39;%s&#39; at revision %s" % (f_path, rev[:12])
 
        msg = "There is no file nor directory at the given path: &apos;%s&apos; at revision %s" % (f_path, rev[:12])
 
        response.mustcontain(msg)
 

	
 
    def test_ajaxed_files_list(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        response = self.app.get(
 
            url('files_nodelist_home', repo_name=HG_REPO, f_path='/',
 
                revision=rev),
 
            extra_environ={'HTTP_X_PARTIAL_XHR': '1'},
 
        )
 
        response.mustcontain("vcs/web/simplevcs/views/repository.py")
 

	
 
    # Hg - ADD FILE
 
    def test_add_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'))
 

	
 
    def test_add_file_into_hg_missing_content(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': '',
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_hg_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parametrize('location,filename', [
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_hg_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parametrize('cnt,location,filename', [
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_hg(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='hg')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - add file
 
    def test_add_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'))
 

	
 
    def test_add_file_into_git_missing_content(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                     'content': '',
 
                                     '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_git_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parametrize('location,filename', [
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_git_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parametrize('cnt,location,filename', [
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_git(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='git')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Hg - EDIT
 
    def test_edit_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_edit_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_edit_file_view_not_on_branch_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
kallithea/tests/functional/test_pullrequests.py
Show inline comments
 
import re
 

	
 
import pytest
 

	
 
from kallithea.controllers.pullrequests import PullrequestsController
 
from kallithea.model.db import PullRequest, User
 
from kallithea.model.meta import Session
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestPullrequestsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='pullrequests', action='index',
 
                                    repo_name=HG_REPO))
 

	
 
    def test_create_trivial(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:stable:4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        response = response.follow()
 
        assert response.status == '200 OK'
 
        response.mustcontain('Successfully opened new pull request')
 
        response.mustcontain('No additional changesets found for iterating on this pull request')
 
        response.mustcontain('href="/vcs_test_hg/changeset/4f7e2131323e0749a740c0a56ab68ae9269c562a"')
 

	
 
    def test_available(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        response = response.follow()
 
        assert response.status == '200 OK'
 
        response.mustcontain(no='No additional changesets found for iterating on this pull request')
 
        response.mustcontain('The following additional changes are available on stable:')
 
        response.mustcontain('<input id="updaterev_4f7e2131323e0749a740c0a56ab68ae9269c562a" name="updaterev" type="radio" value="4f7e2131323e0749a740c0a56ab68ae9269c562a" />')
 
        response.mustcontain('href="/vcs_test_hg/changeset/4f7e2131323e0749a740c0a56ab68ae9269c562a"') # as update
 

	
 
    def test_range(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:stable:4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        response = response.follow()
 
        assert response.status == '200 OK'
 
        response.mustcontain('No additional changesets found for iterating on this pull request')
 
        response.mustcontain('href="/vcs_test_hg/changeset/4f7e2131323e0749a740c0a56ab68ae9269c562a"')
 

	
 
    def test_update_reviewers(self):
 
        self.log_user()
 
        regular_user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        regular_user2 = User.get_by_username(TEST_USER_REGULAR2_LOGIN)
 
        admin_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 

	
 
        # create initial PR
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
        pull_request1_id = re.search(r'/pull-request/(\d+)/', response.location).group(1)
 
        assert response.location == 'http://localhost/%s/pull-request/%s/_/stable' % (HG_REPO, pull_request1_id)
 

	
 
        # create new iteration
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request1_id),
 
                                 {
 
                                  'updaterev': '4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                  'review_members': [regular_user.user_id],
 
                                 },
 
                                 status=302)
 
        pull_request2_id = re.search(r'/pull-request/(\d+)/', response.location).group(1)
 
        assert pull_request2_id != pull_request1_id
 
        assert response.location == 'http://localhost/%s/pull-request/%s/_/stable' % (HG_REPO, pull_request2_id)
 
        response = response.follow()
 
        # verify reviewer was added
 
        response.mustcontain('<input type="hidden" value="%s" name="review_members" />' % regular_user.user_id)
 

	
 
        # update without creating new iteration
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request2_id),
 
                                 {
 
                                  'pullrequest_title': 'Title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                  'org_review_members': [admin_user.user_id], # fake - just to get some 'meanwhile' warning ... but it is also added ...
 
                                  'review_members': [regular_user2.user_id, admin_user.user_id],
 
                                 },
 
                                 status=302)
 
        assert response.location == 'http://localhost/%s/pull-request/%s/_/stable' % (HG_REPO, pull_request2_id)
 
        response = response.follow()
 
        # verify reviewers were added / removed
 
        response.mustcontain('Meanwhile, the following reviewers have been added: test_regular')
 
        response.mustcontain('Meanwhile, the following reviewers have been removed: test_admin')
 
        response.mustcontain('<input type="hidden" value="%s" name="review_members" />' % regular_user.user_id)
 
        response.mustcontain('<input type="hidden" value="%s" name="review_members" />' % regular_user2.user_id)
 
        response.mustcontain(no='<input type="hidden" value="%s" name="review_members" />' % admin_user.user_id)
 

	
 
    def test_update_with_invalid_reviewer(self):
 
        invalid_user_id = 99999
 
        self.log_user()
 
        # create a valid pull request
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                status=302)
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search(r'/pull-request/(\d+)/', response.location)
 
        assert m is not None
 
        pull_request_id = m.group(1)
 

	
 
        # update it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'updaterev': '4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                  'review_members': [str(invalid_user_id)],
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 
        response.mustcontain('Invalid reviewer &quot;%s&quot; specified' % invalid_user_id)
 

	
 
    def test_edit_with_invalid_reviewer(self):
 
        invalid_user_id = 99999
 
        self.log_user()
 
        # create a valid pull request
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:stable:4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                status=302)
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search(r'/pull-request/(\d+)/', response.location)
 
        assert m is not None
 
        pull_request_id = m.group(1)
 

	
 
        # edit it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                  'review_members': [str(invalid_user_id)],
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 
        response.mustcontain('Invalid reviewer &quot;%s&quot; specified' % invalid_user_id)
 

	
 
    def test_iteration_refs(self):
 
        # Repo graph excerpt:
 
        #   o   fb95b340e0d0 webvcs
 
        #  /:
 
        # o :   41d2568309a0 default
 
        # : :
 
        # : o   5ec21f21aafe webvcs
 
        # : :
 
        # : o   9e6119747791 webvcs
 
        # : :
 
        # o :   3d1091ee5a53 default
 
        # :/
 
        # o     948da46b29c1 default
 

	
 
        self.log_user()
 

	
 
        # create initial PR
 
        response = self.app.post(
 
            url(controller='pullrequests', action='create', repo_name=HG_REPO),
 
            {
 
                'org_repo': HG_REPO,
 
                'org_ref': 'rev:9e6119747791:9e6119747791ff886a5abe1193a730b6bf874e1c',
 
                'other_repo': HG_REPO,
 
                'other_ref': 'branch:default:3d1091ee5a533b1f4577ec7d8a226bb315fb1336',
 
                'pullrequest_title': 'title',
 
                'pullrequest_desc': 'description',
 
                '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
            },
 
            status=302)
 
        pr1_id = int(re.search(r'/pull-request/(\d+)/', response.location).group(1))
 
        pr1 = PullRequest.get(pr1_id)
 

	
 
        assert pr1.org_ref == 'branch:webvcs:9e6119747791ff886a5abe1193a730b6bf874e1c'
 
        assert pr1.other_ref == 'branch:default:948da46b29c125838a717f6a8496eb409717078d'
 

	
 
        Session().rollback() # invalidate loaded PR objects before issuing next request.
 

	
 
        # create PR 2 (new iteration with same ancestor)
 
        response = self.app.post(
 
            url(controller='pullrequests', action='post', repo_name=HG_REPO, pull_request_id=pr1_id),
 
            {
 
                'updaterev': '5ec21f21aafe95220f1fc4843a4a57c378498b71',
 
                'pullrequest_title': 'title',
 
                'pullrequest_desc': 'description',
 
                'owner': TEST_USER_REGULAR_LOGIN,
 
                '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
             },
 
             status=302)
 
        pr2_id = int(re.search(r'/pull-request/(\d+)/', response.location).group(1))
 
        pr1 = PullRequest.get(pr1_id)
 
        pr2 = PullRequest.get(pr2_id)
 

	
 
        assert pr2_id != pr1_id
 
        assert pr1.status == PullRequest.STATUS_CLOSED
 
        assert pr2.org_ref == 'branch:webvcs:5ec21f21aafe95220f1fc4843a4a57c378498b71'
 
        assert pr2.other_ref == pr1.other_ref
 

	
 
        Session().rollback() # invalidate loaded PR objects before issuing next request.
 

	
 
        # create PR 3 (new iteration with new ancestor)
 
        response = self.app.post(
 
            url(controller='pullrequests', action='post', repo_name=HG_REPO, pull_request_id=pr2_id),
 
            {
 
                'updaterev': 'fb95b340e0d03fa51f33c56c991c08077c99303e',
 
                'pullrequest_title': 'title',
 
                'pullrequest_desc': 'description',
 
                'owner': TEST_USER_REGULAR_LOGIN,
 
                '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
             },
 
             status=302)
 
        pr3_id = int(re.search(r'/pull-request/(\d+)/', response.location).group(1))
 
        pr2 = PullRequest.get(pr2_id)
 
        pr3 = PullRequest.get(pr3_id)
 

	
 
        assert pr3_id != pr2_id
 
        assert pr2.status == PullRequest.STATUS_CLOSED
 
        assert pr3.org_ref == 'branch:webvcs:fb95b340e0d03fa51f33c56c991c08077c99303e'
 
        assert pr3.other_ref == 'branch:default:41d2568309a05f422cffb8008e599d385f8af439'
 

	
 

	
 
@pytest.mark.usefixtures("test_context_fixture") # apply fixture for all test methods
 
class TestPullrequestsGetRepoRefs(TestController):
 

	
 
    def setup_method(self, method):
 
        self.repo_name = u'main'
 
        repo = fixture.create_repo(self.repo_name, repo_type='hg')
 
        self.repo_scm_instance = repo.scm_instance
 
        Session().commit()
 
        self.c = PullrequestsController()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_repo(u'main')
 
        Session().commit()
 
        Session.remove()
 

	
 
    def test_repo_refs_empty_repo(self):
 
        # empty repo with no commits, no branches, no bookmarks, just one tag
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance)
 
        assert default == 'tag:null:0000000000000000000000000000000000000000'
 

	
 
    def test_repo_refs_one_commit_no_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance)
 
        assert default == 'branch:default:%s' % cs0.raw_id
 
        assert ([('branch:default:%s' % cs0.raw_id, 'default (current tip)')],
 
                'Branches') in refs
 

	
 
    def test_repo_refs_one_commit_rev_hint(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, rev=cs0.raw_id)
 
        expected = 'branch:default:%s' % cs0.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_no_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_rev_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, rev=cs0.raw_id)
 
        expected = 'rev:%s:%s' % (cs0.raw_id, cs0.raw_id)
 
        assert default == expected
 
        assert ([(expected, 'Changeset: %s' % cs0.raw_id[0:12])], 'Special') in refs
 
        assert ([('branch:default:%s' % cs1.raw_id, 'default (current tip)')], 'Branches') in refs
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, rev=cs1.raw_id)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_branch_hint(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, branch='default')
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_one_branch_no_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        # TODO
0 comments (0 inline, 0 general)